标签:
前面说过当paused设置为false,QuartzSchedulerThread才正式启动,我们接着《Quartz与Spring集成——创建调度器》与《Quartz与Spring集成——启动调度器》中QuartzSchedulerThread启动的部分接着展开分析,QuartzSchedulerThread的run方法紧接着会从线程池获取可用的线程数,代码如下:
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
获取触发器的代码见代码清单1。其中调用了JobStore的acquireNextTriggers方法来获取触发器。
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
//省略异常信息
} catch (RuntimeException e) {
//省略异常信息
}以JobStore的实现类LocalDataSourceJobStore来具体看看acquireNextTriggers方法的执行内容。LocalDataSourceJobStore继承了父类JobStoreSupport的acquireNextTriggers方法(见代码清单2),此方法用于从数据源获取触发器。 public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) {
if (fireInstanceIds.contains(tr.getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}JobStoreSupport的acquireNextTriggers方法,主要调用了executeInNonManagedTXLock方法(见代码清单3),其执行逻辑如下: protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
// If we aren‘t using db locks, then delay getting DB connection
// until after acquiring the lock since it isn‘t needed.
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {
conn = getNonManagedTXConnection();
}
final T result = txCallback.execute(conn);
try {
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
@Override
public Boolean execute(Connection conn) throws JobPersistenceException {
return txValidator.validate(conn, result);
}
})) {
throw e;
}
}
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if(sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
}acquireNextTrigger方法用于获取触发器,它的执行步骤如下:
首先,查询状态为WAITING的触发器(见代码清单4),以StdJDBCDelegate为例,其selectTriggerToAcquire方法(使用JDBC的API,留给读者自己去看)实际就是执行sql查询触发器,执行的sql为:
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC。
TRIGGER_STATE的条件是WAITING,NEXT_FIRE_TIME小于最迟的触发时间,并且要大于最早的触发时间。
注意:本文所有sql中的{0}为QRTZ_,{1}为‘schedulerFactoryBean‘。
代码清单4
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
其次,遍历集合keys中的TriggerKey,在循环中执行以下步骤:
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);retrieveTrigger方法内部实际执行了Delegate的selectTrigger方法。以StdJDBCDelegate为例,其selectTrigger方法根据TriggerKey查询触发器,执行的sql为:SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
JobKey jobKey = nextTrigger.getJobKey();
JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper());
if (job.isConcurrentExectionDisallowed()) {
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
continue; // next trigger
} else {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}以StdJDBCDelegate为例,其selectJobDetail方法执行的sql为:SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);以StdJDBCDelegate为例,其updateTriggerStateFromOtherState方法执行的sql为:UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);以StdJDBCDelegate为例,其insertFiredTrigger方法执行的sql为:INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)acquiredTriggers.add(nextTrigger);
代码清单5
// set triggers to ‘executing‘
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
//省略异常信息
}
}
以JobStore的实现类LocalDataSourceJobStore来具体看看triggersFired方法的执行内容。LocalDataSourceJobStore继承了父类JobStoreSupport的triggersFired方法(见代码清单6),此方法用于触发触发器。
代码清单6
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
new TransactionCallback<List<TriggerFiredResult>>() {
public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
TriggerFiredResult result;
for (OperableTrigger trigger : triggers) {
try {
TriggerFiredBundle bundle = triggerFired(conn, trigger);
result = new TriggerFiredResult(bundle);
} catch (JobPersistenceException jpe) {
result = new TriggerFiredResult(jpe);
} catch(RuntimeException re) {
result = new TriggerFiredResult(re);
}
results.add(result);
}
return results;
}
},
new TransactionValidator<List<TriggerFiredResult>>() {
@Override
public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> executingTriggers = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
if (STATE_EXECUTING.equals(ft.getFireInstanceState())) {
executingTriggers.add(ft.getFireInstanceId());
}
}
for (TriggerFiredResult tr : result) {
if (tr.getTriggerFiredBundle() != null && executingTriggers.contains(tr.getTriggerFiredBundle().getTrigger().getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}可以看到triggersFired方法也调用了executeInNonManagedTXLock方法,我们根据前面的分析,知道最终实际会回调新的TransactionCallback匿名类的execute方法,可以看到其主要执行逻辑无非循环triggers列表,并且调用triggerFired方法获取TriggerFiredBundle。triggerFired的执行步骤如下: try { // if trigger was deleted, state will be STATE_DELETED
String state = getDelegate().selectTriggerState(conn,
trigger.getKey());
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
} catch (SQLException e) {
throw new JobPersistenceException("Couldn‘t select trigger state: "
+ e.getMessage(), e);
}以StdJDBCDelegate为例,其selectTriggerState方法的执行sql为:SELECT TRIGGER_STATE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
try {
job = retrieveJob(conn, trigger.getJobKey());
if (job == null) { return null; }
} catch (JobPersistenceException jpe) {
try {
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
getDelegate().updateTriggerState(conn, trigger.getKey(),
STATE_ERROR);
} catch (SQLException sqle) {
getLog().error("Unable to set trigger state to ERROR.", sqle);
}
throw jpe;
}这里的retrieveJob方法实际也调用了Delegate的selectJobDetail方法,不再赘述。 try {
getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
} catch (SQLException e) {
throw new JobPersistenceException("Couldn‘t insert fired trigger: "
+ e.getMessage(), e);
}这里实际执行的sql为UPDATE {0}FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = {1} AND ENTRY_ID = ?
trigger.triggered(cal);
if (job.isConcurrentExectionDisallowed()) {
state = STATE_BLOCKED;
force = false;
try {
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_WAITING);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_ACQUIRED);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_PAUSED_BLOCKED, STATE_PAUSED);
} catch (SQLException e) {
throw new JobPersistenceException(
"Couldn‘t update states of blocked triggers: "
+ e.getMessage(), e);
}
} storeTrigger(conn, trigger, job, true, state, force, false);storeTrigger方法首先调用triggerExists用于判断当前触发器是否存在:
boolean existingTrigger = triggerExists(conn, newTrigger.getKey());以StdJDBCDelegate为例,其triggerExists方法中执行的sql为:SELECT TRIGGER_NAME FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?。然后根据existingTrigger的值插入或者更新表QRTZ_TRIGGERS中触发器的下次触发时间,代码如下:
if (existingTrigger) {
getDelegate().updateTrigger(conn, newTrigger, state, job);
} else {
getDelegate().insertTrigger(conn, newTrigger, state, job);
}以StdJDBCDelegate为例,其执行的sql为:INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
.equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());之后QuartzSchedulerThread会遍历每个TriggerFiredBundle,然后创建作业运行的shell脚本,见代码清单7.
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it‘s possible to get ‘null‘ if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
} public JobRunShell createJobRunShell(TriggerFiredBundle bndle) throws SchedulerException {
return new JobRunShell(scheduler, bndle);
} if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}runInThread方法的的实现见代码清单10。
代码清单10
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
} synchronized(lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
runnable.run();
}
}上面代码中的runnable,实际就是之前创建的JobRunShell。可以看到WorkerThread的run方法实际代理执行了JobRunShell的run方法。JobRunShell的run方法中最重要的是执行了以下代码:job.execute(jec);
public void execute(JobExecutionContext context)
throws JobExecutionException {
JobDataMap data = context.getMergedJobDataMap();
String command = data.getString(PROP_COMMAND);
String parameters = data.getString(PROP_PARAMETERS);
if (parameters == null) {
parameters = "";
}
boolean wait = true;
if(data.containsKey(PROP_WAIT_FOR_PROCESS)) {
wait = data.getBooleanValue(PROP_WAIT_FOR_PROCESS);
}
boolean consumeStreams = false;
if(data.containsKey(PROP_CONSUME_STREAMS)) {
consumeStreams = data.getBooleanValue(PROP_CONSUME_STREAMS);
}
Integer exitCode = this.runNativeCommand(command, parameters, wait, consumeStreams);
context.setResult(exitCode);
}NativeJob的execute方法中的关键代码即调用了runNativeCommand方法,其实现见代码清单12。我们可以知道Quartz是如何运行shell了。 private Integer runNativeCommand(String command, String parameters, boolean wait, boolean consumeStreams) throws JobExecutionException {
String[] cmd;
String[] args = new String[2];
Integer result = null;
args[0] = command;
args[1] = parameters;
try {
//with this variable will be done the swithcing
String osName = System.getProperty("os.name");
// specific for Windows
if (osName.startsWith("Windows")) {
cmd = new String[args.length + 2];
if (osName.equals("Windows 95")) { // windows 95 only
cmd[0] = "command.com";
} else {
cmd[0] = "cmd.exe";
}
cmd[1] = "/C";
System.arraycopy(args, 0, cmd, 2, args.length);
} else if (osName.equals("Linux")) {
cmd = new String[3];
cmd[0] = "/bin/sh";
cmd[1] = "-c";
cmd[2] = args[0] + " " + args[1];
} else { // try this...
cmd = args;
}
Runtime rt = Runtime.getRuntime();
// Executes the command
getLog().info("About to run " + cmd[0] + " " + cmd[1] + " " + (cmd.length>2 ? cmd[2] : "") + " ...");
Process proc = rt.exec(cmd);
// Consumes the stdout from the process
StreamConsumer stdoutConsumer = new StreamConsumer(proc.getInputStream(), "stdout");
// Consumes the stderr from the process
if(consumeStreams) {
StreamConsumer stderrConsumer = new StreamConsumer(proc.getErrorStream(), "stderr");
stdoutConsumer.start();
stderrConsumer.start();
}
if(wait) {
result = proc.waitFor();
}
// any error message?
} catch (Throwable x) {
throw new JobExecutionException("Error launching native command: ", x, false);
}
return result;
}在QuartzSchedulerThread执行完作业后还会进行一些后续处理,见代码清单9。以StdJDBCDelegate为例,其triggeredJobComplete方法的实现见代码清单13.
代码清单13
public void triggeredJobComplete(final OperableTrigger trigger,
final JobDetail jobDetail, final CompletedExecutionInstruction triggerInstCode) {
retryExecuteInNonManagedTXLock(
LOCK_TRIGGER_ACCESS,
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
triggeredJobComplete(conn, trigger, jobDetail,triggerInstCode);
}
});
}可以看到其中调用了retryExecuteInNonManagedTXLock方法,其执行逻辑和executeInNonManagedTXLock非常相似,最终回调了VoidTransactionCallback匿名类的execute方法。triggeredJobComplete方法将做一些最终的工作:如清除QRTZ_FIRED_TRIGGERS表中触发器触发的实例,更新触发器的完成或者错误状态等。后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。

京东:http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html
Quartz与Spring集成——QuartzSchedulerThread的执行分析
标签:
原文地址:http://blog.csdn.net/beliefer/article/details/52315730