Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,18 @@ public class Config extends ConfigBase {
@ConfField(masterOnly = true)
public static boolean enable_hms_events_incremental_sync = false;

/**
* If set to true, doris will try to parse the ddl of a hive view and try to execute the query
* otherwise it will throw an AnalysisException.
*/
@ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL, description = {
"当前默认设置为 false,开启后支持使用新优化器的load语句导入数据,失败后会降级旧的load语句。",
"Now default set to true, After this function is enabled, the load statement of "
+ "the new optimizer can be used to import data. If this function fails, "
+ "the old load statement will be degraded."})
public static boolean enable_nereids_load = false;


/**
* Maximum number of events to poll in each RPC.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ statement
(withRemoteStorageSystem)?
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #load
| LOAD LABEL lableName=identifier
LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
resourceDesc
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #resourceLoad
| LOAD mysqlDataDesc
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #mysqlLoad
Expand Down Expand Up @@ -131,7 +126,7 @@ dataDesc
(PARTITION partition=identifierList)?
(COLUMNS TERMINATED BY comma=STRING_LITERAL)?
(LINES TERMINATED BY separator=STRING_LITERAL)?
(FORMAT AS format=identifier)?
(FORMAT AS format=identifierOrStringLiteral)?
(columns=identifierList)?
(columnsFromPath=colFromPath)?
(columnMapping=colMappingList)?
Expand Down Expand Up @@ -167,6 +162,11 @@ refreshMethod
: COMPLETE | AUTO
;

identifierOrStringLiteral
: identifier
| STRING_LITERAL
;

identifierOrText
: errorCapturingIdentifier
| STRING_LITERAL
Expand Down Expand Up @@ -224,7 +224,8 @@ mappingExpr
;

withRemoteStorageSystem
: WITH S3 LEFT_PAREN
: resourceDesc
| WITH S3 LEFT_PAREN
brokerProperties=propertyItemList
RIGHT_PAREN
| WITH HDFS LEFT_PAREN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ public void analyze(Analyzer analyzer) throws UserException {
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo when support other stmt,need to check stmt type and generate jobInstance
InsertJob job = new InsertJob();
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);
job.setCreateTimeMs(System.currentTimeMillis());
TimerDefinition timerDefinition = new TimerDefinition();

if (null != onceJobStartTimestamp) {
Expand Down Expand Up @@ -148,17 +146,19 @@ public void analyze(Analyzer analyzer) throws UserException {
}
checkJobName(labelName.getLabelName());
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
job.setJobConfig(jobExecutionConfiguration);

job.setComment(comment);
job.setCurrentDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
job.setJobStatus(JobStatus.RUNNING);
job.setJobId(Env.getCurrentEnv().getNextId());
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
job.setExecuteSql(executeSql);
// create job use label name as its job name
String jobName = labelName.getLabelName();
InsertJob job = new InsertJob(jobName,
JobStatus.RUNNING,
labelName.getDbName(),
comment,
ConnectContext.get().getCurrentUserIdentity(),
jobExecutionConfiguration,
System.currentTimeMillis(),
executeSql);
//job.checkJobParams();
jobInstance = job;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ public Set<JobState> getStates() {
return states;
}

public org.apache.doris.load.loadv2.JobState getStateV2() {
if (Strings.isNullOrEmpty(stateValue)) {
return null;
}
return org.apache.doris.load.loadv2.JobState.valueOf(stateValue);
}

public boolean isAccurateMatch() {
return isAccurateMatch;
}
Expand Down
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.jobs.load.LabelProcessor;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
Expand Down Expand Up @@ -362,6 +363,7 @@ public class Env {

private ExportTaskRegister exportTaskRegister;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private LabelProcessor labelProcessor;
private TransientTaskManager transientTaskManager;

private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
Expand Down Expand Up @@ -641,8 +643,11 @@ private Env(boolean isCheckpointCatalog) {
}
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.jobManager = new JobManager<>();
this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
this.transientTaskManager = new TransientTaskManager();

this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
this.isElectable = false;
Expand Down Expand Up @@ -3907,6 +3912,10 @@ public JobManager getJobManager() {
return jobManager;
}

public LabelProcessor getLabelProcessor() {
return labelProcessor;
}

public TransientTaskManager getTransientTaskManager() {
return transientTaskManager;
}
Expand Down
98 changes: 85 additions & 13 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskStatus;
Expand Down Expand Up @@ -76,14 +78,55 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
private JobExecutionConfiguration jobConfig;

@SerializedName(value = "ctms")
private Long createTimeMs;
private long createTimeMs;

@SerializedName(value = "sql")
String executeSql;
@SerializedName(value = "stm")
private long startTimeMs = -1L;

@SerializedName(value = "ftm")
private long finishTimeMs;

@SerializedName(value = "sql")
String executeSql;

public AbstractJob() {}

public AbstractJob(Long id) {
jobId = id;
}

/**
* executeSql and runningTasks is not required for load.
*/
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
Comment thread
wsjz marked this conversation as resolved.
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null, null);
}

public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql,
List<T> runningTasks) {
this.jobId = jobId;
this.jobName = jobName;
this.jobStatus = jobStatus;
this.currentDbName = currentDbName;
this.comment = comment;
this.createUser = createUser;
this.jobConfig = jobConfig;
this.createTimeMs = createTimeMs;
this.executeSql = executeSql;
this.runningTasks = runningTasks;
}

private List<T> runningTasks = new ArrayList<>();

@Override
Expand All @@ -109,6 +152,10 @@ public void cancelAllTasks() throws JobException {
.add("Comment")
.build();

protected static long getNextJobId() {
return System.nanoTime() + RandomUtils.nextInt();
}

@Override
public void cancelTaskById(long taskId) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
Expand Down Expand Up @@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
return createTasks(taskType, taskContext);
}

public void initTasks(List<? extends AbstractTask> tasks) {
public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
if (CollectionUtils.isEmpty(getRunningTasks())) {
runningTasks = new ArrayList<>();
}
tasks.forEach(task -> {
task.setJobId(jobId);
task.setTaskId(getNextId());
task.setTaskType(taskType);
task.setJobId(getJobId());
Comment thread
wsjz marked this conversation as resolved.
task.setCreateTimeMs(System.currentTimeMillis());
task.setStatus(TaskStatus.PENDING);
});
if (CollectionUtils.isEmpty(getRunningTasks())) {
setRunningTasks(new ArrayList<>());
}
getRunningTasks().addAll((Collection<? extends T>) tasks);
getRunningTasks().addAll(tasks);
this.startTimeMs = System.currentTimeMillis();
}

public void checkJobParams() {
Expand Down Expand Up @@ -208,10 +256,22 @@ public void updateJobStatus(JobStatus newJobStatus) throws JobException {
public static AbstractJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
job.setRunningTasks(new ArrayList<>());
job.runningTasks = new ArrayList<>();
return job;
}

public void logCreateOperation() {
Env.getCurrentEnv().getEditLog().logCreateJob(this);
}

public void logFinalOperation() {
Env.getCurrentEnv().getEditLog().logEndJob(this);
}

public void logUpdateOperation() {
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}

@Override
public void onTaskFail(T task) throws JobException {
updateJobStatusIfEnd();
Expand Down Expand Up @@ -303,7 +363,19 @@ public ShowResultSetMetaData getJobMetaData() {
return builder.build();
}

private static long getNextId() {
Comment thread
wsjz marked this conversation as resolved.
return System.nanoTime() + RandomUtils.nextInt();
@Override
public void onRegister() throws JobException {}

@Override
public void onUnRegister() throws JobException {}

@Override
public void onReplayCreate() throws JobException {
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay create scheduler job").build());
}

@Override
public void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException {
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay delete scheduler job").build());
}
}
25 changes: 24 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {

/**
* Cancels all running tasks of this job.
*
* @throws JobException If cancelling a running task fails.
*/
void cancelAllTasks() throws JobException;

/**
* register job
* @throws JobException If register job failed.
*/
void onRegister() throws JobException;

/**
* register job failed
* @throws JobException If failed.
*/
void onUnRegister() throws JobException;

/**
* replay create job
* @throws JobException If replay create failed.
*/
void onReplayCreate() throws JobException;

/**
* replay finished or cancelled job
* @throws JobException If replay end failed.
*/
void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException;

/**
* Notifies the job when a task execution fails.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum JobExecuteType {
*/
MANUAL,
/**
* The job will be executed immediately.
* The job will be executed only once and immediately.
*/
INSTANT,
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public enum JobStatus {
* The stop state cannot be resumed
*/
STOPPED,

/**
* When the task is finished, the finished state will be triggered.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@

public enum JobType {
INSERT,
MV
MV,
}
Loading