Quartz任务与日志的持久化
2017-11-25 / modified at 2022-04-04 / 1.5k words / 6 mins
️This article has been over 2 years since the last update.

定时任务在企业IT与互联网系统中使用非常广泛,一般用于去做耗时的分析、统计、报表、对账等任务. 现实开发中,一般采用Quartz作为Job实现。但是直接基于内存使用有如下痛点

  • 断电无法保存
  • 无法实现分布式锁
  • 无法回溯问题

本文将对此进行解决

1. 使用Quartz的Job持久化功能

在分析前,首先要上一个HelloWorld,首先集成Quartz,配置好数据源,并在数据库中刷好DDL,这个网上有很多入门教程

1
2
3
# 配置JobStore的实现类
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate

然后对Job进行调度,查询qrtz_job_details就可以看到具体的内容了。

1.1. 悲观锁分析

当业务需要对某个定时任务进行调度时,将在业务代码中调用StdScheduler#scheduleJob(),通过对其实现类进行分析,它实际上调用的是

1
org.quartz.core.QuartzScheduler#scheduleJob(org.quartz.JobDetail, org.quartz.Trigger)

这里并没有立即执行任务,而是将Job放入了JobStore中(有点类似于队列)

1
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

此时,如果你的代码量阅读够大,有一定的嗅觉,你一定会对resources.getJobStore进行findUsage搜索,经过各种搜索,我们固然发现了锁(QRTZ_LOCKS)的实现,主要位于StdRowLockSemaphoreJobStoreSupport

下文删除了部分代码以简化板面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
// 这里的`for update`是对当前row的互斥锁,如果没拿到将阻塞等待
// select * from QRTZ_LOCKS t where t.lock_name=${lockName} for update
// insert into (SCHED_NAME, LOCK_NAME) VAULE ...
transOwner = getLockHandler().obtainLock(conn, lockName);
// 执行更新Trigger/Job等非安全的操作
final T result = txCallback.execute(conn);
try {
// 执行Commit提交事务,释放锁
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
}
//调用 notifyAll();
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if (sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} finally {
try {
// 此部分仅更新ThreadLocal,不涉及DB
releaseLock(lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
}

总的来说,就是利用for update拖着不提交,等干完活再提交来实现的。

1.2. 防止程序同时执行

在Quartz中,通过注解@DisallowConcurrentExecution实现,当Trigger在完成retrieveJob后,将检查Job的注解,如果Job已经在运行,将在循环中continue跳过此Job

2. 定制历史记录插件

1. Quartz的Plugin系统

插件系统可以充分利用各种Listener,在Trigger或者Job执行生命周期中实现AOP定制,比如下面就是在Trigger中加日志的插件。在配置文件中加入如下后,当Tigger完成后将自动打上Log。

1
2
3
4
org.quartz.plugin.his.class=org.quartz.plugins.history.LoggingTriggerHistoryPlugin
org.quartz.plugin.his.triggerFiredMessage=Trigger [{1}.{0}] fired job ...
org.quartz.plugin.his.triggerCompleteMessage=Trigger [{1}.{0}] completed ...
org.quartz.plugin.his.triggerMisfiredMessage=Trigger [{1}.{0}] misfired job ...

插件的实现原理很简单,它没有依赖Spring注入,而是在Factory初始化读取Prop时通过Class.newInstance生成对象实例,并读取prop(比如上面的triggerFiredMessage)调用反射setTriggerFiredMessage()注入属性

1
2
3
4
5
6
7
// 通过Class.newInstance,并反射`setProp`注入参数
org.quartz.impl.StdSchedulerFactory#instantiate();
// 以上图为例,首先截取掉前面的Group,然后实例化
def listenerClass = "org.quartz.plugins.history.LoggingTriggerHistoryPlugin"
def listener = loadHelper.loadClass(listenerClass).newInstance();
def m = listener.getClass().getMethod("set" + "TriggerFiredMessage", [String.class]);
m.invoke(listener,"Trigger [{1}.{0}] fired job ...")

这里我曾经由于强迫症,尝试使用YAML替代Properties,事实证明这个是需要进行一定技巧的改造,由于直接通过Yaml转过来的是HashMap树形结构,而不是平行的关系,因此你需要设计一个Flattern操作来实现。

2. 历史执行记录的Plugin定制

定制历史记录主要是方便日后有据可查(有锅不背),以减少黑盒问题。下文只提供思路,不提供源码。

2.1. 定制Listener

定制JobListenerSchedulerPlugin接口,并在jobWasExecuted中进行记录操作,此处可以参考LoggingJobHistoryPlugin,并通过context获取Fire相关,Map相关以及Result相关

1
2
3
4
5
public interface JobListener {
//重写如下方法,并写入日志
void jobWasExecuted(JobExecutionContext context,
JobExecutionException jobException);
}

除了cron等基本信息外,这里最重要的就是JobExecutionContext.getResult()还有任务中的dataMap,需要结合具体业务进行反序列化,这里也就是为什么网上基本看不到开源实现的原因,因为都是与业务强绑定的。

2.2. 定制DBAppender

写入可以参考LogbackDBAppender的实现方式,不借助ORM框架实现高性能写入日志,拼装与commit操作,由于我删除了Event,因此没有加锁(本身没有加锁场景),可以参考这里,伪代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
String getInsertSQL(){
//此部分使用了SEQ,代替了默认的Trigger,只在Oracle下测试通过
return "INSERT INTO JOB_HIS (ID, ....) VALUES (TASK_SQL.nextval, ?, ?, ?, ?...)";
}

protected appendDBlog(T t){
Connection connection = null;
PreparedStatement insertStatement = null;
try{
connection = getConnection();
connection.setAutoCommit(false);
insertStatement = connection.prepareStatement(getInsertSQL(), new String[] { EVENT_ID_COL_NAME });
prepareStatement.setString(1,..);
....
connection.commit();
}catch(Exception e){
//各种异常
}finally{
//关闭DB流
}
}

此处主要难点在移植DBAppeder,并复用Quartz的DataSource。

2.3. 定制日志DDL

DDL可以参考Logback的SQL,基本上可以复用,但是在Logback中,默认是使用的Trigger,由于这个Trigger权限有时候不好搞到,我们可以用SEQ代替。

当日志快要满时,我们可以手动或者Trigger删除旧的日志

1
delete from JOB_HIS where FIRE_DATE < sysdate - 90;

总结

目前此日志插件在内部已经正常使用(主要还是并发不大,而且侧重点在可以回溯问题),希望能给踩坑Quartz的读者提供一个思路。

不足

  • 对时间要求极高,需要1s以内
  • 对服务化支持不足,但是已经有了改进,可以参考我的Eureka相关GitBook