//根据配置对象获取manager对象
executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
//创建一个大小为1的线程池,这个线程池中周期性的调度查看是否有可执行的任务。
fetcherPool = Executors.newScheduledThreadPool(1);
//真正调度任务执行的线程池的大小,默认为10个,使用的队列是无最大上限的。
int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
//所有正在执行的任务都会保存在context中。
context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
//从元数据库中获取所有的READY状态的任务,置为ERROR,以供接下来重新调度执行。
for (AbstractExecutable executable : executableManager.getAllExecutables()) {
if (executable.getStatus() == ExecutableState.READY) {
executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status");
}
}
//将所有RUNNING状态的Job设置为ERROR,以供重新调度。
executableManager.updateAllRunningJobsToError();
//进程退出的时候销毁两个线程池,释放zookeeper上的锁
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
logger.debug("Closing zk connection");
try {
shutdown();
} catch (SchedulerException e) {
logger.error("error shutdown scheduler", e);
}
}
});
//FetcherRunner线程是周期性的查看其它任务是否可执行的线程,第一次调度的时延为10秒,接下来60秒调度一次。
fetcher = new FetcherRunner();
fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
hasStarted = true;
private class JobRunner implements Runnable {
private final AbstractExecutable executable;
public JobRunner(AbstractExecutable executable) {
this.executable = executable;
}
@Override
public void run() {
try {
//执行job的处理函数
executable.execute(context);
//执行完成之后触发下一次任务的查询,而不是等到下一个60秒。
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
} catch (ExecuteException e) {
logger.error("ExecuteException job:" + executable.getId(), e);
} catch (Exception e) {
logger.error("unknown error execute job:" + executable.getId(), e);
} finally {
context.removeRunningJob(executable);
}
}
}
public final ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException {
//print a eye-catching title in log
LogTitlePrinter.printTitle(this.getName());
Preconditions.checkArgument(executableContext instanceof DefaultContext);
ExecuteResult result;
try {
onExecuteStart(executableContext);
result = doWork(executableContext);
} catch (Throwable e) {
logger.error("error running Executable", e);
onExecuteError(e, executableContext);
throw new ExecuteException(e);
}
onExecuteFinished(result, executableContext);
return result;
} @Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
List<? extends Executable> executables = getTasks();
final int size = executables.size();
for (int i = 0; i < size; ++i) {
Executable subTask = executables.get(i);
if (subTask.isRunnable()) {
return subTask.execute(context);
}
}
return new ExecuteResult(ExecuteResult.State.SUCCEED, null);
}
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/yu616568/article/details/48104623