Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 88 additions & 52 deletions fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class LoadAction extends RestBaseController {

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT)
public Object load(HttpServletRequest request, HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand All @@ -102,21 +102,29 @@ public Object load(HttpServletRequest request, HttpServletResponse response,

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load", method = RequestMethod.PUT)
public Object streamLoad(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE;
return new RestBaseResult(msg);
if (groupCommitStr != null) {
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
}
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
groupCommit = true;
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
try {
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE;
return new RestBaseResult(msg);
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
Comment thread
Yukang-Lian marked this conversation as resolved.
}
if (needRedirect(request.getScheme())) {
Expand Down Expand Up @@ -147,21 +155,32 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
boolean groupCommit = false;
long tableId = -1;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE;
return new RestBaseResult(msg);
if (groupCommitStr != null) {
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
}
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
try {
groupCommit = true;
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();

// async mode needs to write WAL, we need to block load during waiting WAL.
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE;
return new RestBaseResult(msg);
}

}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
executeCheckPassword(request, response);
Expand Down Expand Up @@ -223,8 +242,8 @@ private String[] parseDbAndTb(String sql) throws Exception {

@RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
LOG.info("streamload action 2PC, db: {}, headers: {}", db, getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
Expand All @@ -236,9 +255,9 @@ public Object streamLoad2PC(HttpServletRequest request,

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC_table(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
Expand Down Expand Up @@ -382,7 +401,7 @@ private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolea
if (Strings.isNullOrEmpty(cloudClusterName)) {
throw new LoadException("No cloud cluster name selected.");
}
return selectCloudRedirectBackend(cloudClusterName, request, groupCommit);
return selectCloudRedirectBackend(cloudClusterName, request, groupCommit, tableId);
} else {
return selectLocalRedirectBackend(groupCommit, request, tableId);
}
Expand All @@ -409,21 +428,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
if (groupCommit) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(request.getRemoteAddr());
// We set this variable to fulfill required field 'user' in
// TMasterOpRequest(FrontendService.thrift)
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();

try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx, false);
} catch (DdlException e) {
throw new RuntimeException(e);
}
backend = selectBackendForGroupCommit("", request, tableId, false);
} else {
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
}
Expand All @@ -433,17 +438,23 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}

private TNetworkAddress selectCloudRedirectBackend(String clusterName, HttpServletRequest req, boolean groupCommit)
private TNetworkAddress selectCloudRedirectBackend(String clusterName, HttpServletRequest req, boolean groupCommit,
long tableId)
throws LoadException {
Backend backend = StreamLoadHandler.selectBackend(clusterName, groupCommit);
Backend backend = null;
if (groupCommit) {
backend = selectBackendForGroupCommit(clusterName, req, tableId, true);
} else {
backend = StreamLoadHandler.selectBackend(clusterName);
}

String redirectPolicy = req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
// User specified redirect policy
if (redirectPolicy != null && redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_RANDOM_BE)) {
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
? Config.streamload_redirect_policy : redirectPolicy;
? Config.streamload_redirect_policy : redirectPolicy;

Pair<String, Integer> publicHostPort = null;
Pair<String, Integer> privateHostPort = null;
Expand Down Expand Up @@ -563,7 +574,7 @@ private boolean checkClusterToken(String token) {
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private Object executeWithClusterToken(HttpServletRequest request, String db,
String table, boolean isStreamLoad) {
String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
Expand Down Expand Up @@ -647,4 +658,29 @@ private String getAllHeaders(HttpServletRequest request) {
}
return headers.toString();
}

private Backend selectBackendForGroupCommit(String clusterName, HttpServletRequest req, long tableId,
boolean isCloud)
throws LoadException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(req.getRemoteAddr());
// We set this variable to fulfill required field 'user' in
// TMasterOpRequest(FrontendService.thrift)
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();
if (isCloud) {
ctx.setCloudCluster(clusterName);
}

Backend backend = null;
try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx, isCloud);
} catch (DdlException e) {
throw new LoadException(e.getMessage(), e);
}
return backend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ public StreamLoadHandler(TStreamLoadPutRequest request, AtomicInteger indexId,
* Select a random backend in the given cloud cluster.
*
* @param clusterName cloud cluster name
* @param groupCommit if this selection is for group commit
* @throws LoadException if there is no available backend
*/
public static Backend selectBackend(String clusterName, boolean groupCommit) throws LoadException {
public static Backend selectBackend(String clusterName) throws LoadException {
List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(clusterName)
.stream().filter(be -> be.isAlive() && (!groupCommit || groupCommit && !be.isDecommissioned()))
.stream().filter(Backend::isAlive)
.collect(Collectors.toList());

if (backends.isEmpty()) {
Expand All @@ -101,8 +100,7 @@ public static Backend selectBackend(String clusterName, boolean groupCommit) thr
// TODO: add a more sophisticated algorithm to select backend
SecureRandom rand = new SecureRandom();
int randomIndex = rand.nextInt(backends.size());
Backend backend = backends.get(randomIndex);
return backend;
return backends.get(randomIndex);
}

public void setCloudCluster() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public HttpPut generateRequestForMySqlLoad(
private String selectBackendForMySqlLoad(String database, String table) throws LoadException {
Backend backend = null;
if (Config.isCloudMode()) {
backend = StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster(), false);
backend = StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster());
} else {
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
Expand Down