开源项目中的线程池
下面介绍几个开源项目中如何配置的线程池
JDK自带Executors里面几个线程池
这几个就不介绍了,可以直接看JavaDoc
corePoolSize | maximumPoolSize | keepAlive | BlockingQueue | |
---|---|---|---|---|
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60s | SynchronousQueue |
newFixedThreadPool | ${nThreads} | ${nThreads} | 0s | LinkedBlockingQueue |
newSingleThreadExecutor | 1 | 1 | 0s | LinkedBlockingQueue |
初始线程数并不是越大越好,它最终取决于代码中可以并行运算的比例。当并发线程太多时,系统整体性能反而会下降,因为系统把很多时间花在了线程调度上,详见阿姆达尔定律
RxJava中几个线程池
在RxJava2中主要有如下4个线程池(划掉的不进行分析)
SingleScheduler- ComputationScheduler
- IoScheduler
NewThreadScheduler
本文只专注于它的“执行”过程,而不是调度过程,最重要的部分在Worker
中,因此可以这样构造代码研究
// Groovy代码
5.times {
Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(new Runnable() {
@Override
void run() {
println "inner = " + Thread.currentThread().getStackTrace().join('\n')
println "inner = " + Thread.currentThread().getName()
}
});
}
Thread.currentThread().join()
下面假设你已经把上面流程全部调通。以下为我的调试结果
线程池
: 一群只会干活的奋斗者,所有线程池构造都是newScheduledThreadPool(1, factory)
。Worker
: Worker是对线程池的包装,类似于基层领导,一个Worker被分配一个线程池,当然这个线程池可能身兼数职。EventLoopWorker
: 对线程池外部状态进行维护,可以看作某些公司的HR
Worker状态
创建
调用createWorker创建一个Worker
//IoScheduler
public Worker createWorker() {
//CachedWorkerPool: 变长队列,可以创建与缓存无数个线程池
return new EventLoopWorker(pool.get());
}
//ComputationScheduler
public Worker createWorker() {
//FixedSchedulerPool: 用数组pool[CPUSize]实现,通过取余调用(实现代码为`pool[i++%4]`)分配任务,不能创建更多的线程池
return new EventLoopWorker(pool.get().getEventLoop());
}
干活
work执行schedule
,最终调用线程池的executor.submit()
,并返回一个包装后的Future,这里没什么可讲的。
任务结束
onComplete
后RxJava框架调用FlowableSubscribeOn
的dispose
方法,最终主动调用worker.dispose()
//IoScheduler.EventLoopWorker
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
// 让线程池执行cancel命令,并清空所有Future
tasks.dispose();
// 设置Worker超时,加入“短命单”,60s后定时清理一次
// 这里原理是构造了每隔60s自动执行的`evictorService`,它是一个清洁线程池,定期执行`evictExpiredWorkers`方法,移除队列`expiringWorkerQueue`中超时的Worker
// releasing the pool<threadWorker> should be the last action
pool.release(threadWorker);
}
}
//ComputationScheduler.EventLoopWorker
@Override
public void dispose() {
if (!disposed) {
disposed = true;
// 斩立决,让线程池执行cancel命令,并清空所有Future
both.dispose();
}
}
IoScheduler的模式正是某些私营公司在工作量大时海量招聘时堆人,加班奋斗后变成甘蔗渣,最后进入短名单被HR定期优化的流程。
ComputationScheduler的模式是某些公司工作量大时加班再多也不招人,加班奋斗后变成甘蔗渣,最后直接被通知斩立决马上走人的流程。
通过断点也可以发现,由于IoScheduler是在static初始化并启动,因此无论是否使用IoScheduler,清洁工线程池evictorService将自动启动且永远不会自动关闭,除非将IoScheduler进行主动shutdown。发现了什么了吗,执行者只是可替代的资源,只有Pool才是核心竞争力!
Dubbo中的线程池
Dubbo是一款事务框架,这种纯框架中使用的线程池一般可以作为各种场景负载下的教科书,我们将Executors
作为关键字进行统计
打开IDEA,运行
cmd
+shift
+F
,搜索Executors.
即可
如果不使用IDEA,可以通过Shell的grep暴力搜索
grep 'Executors.' * -R|grep -v test|grep -v import|grep -v class
经过统计,最终如下
newCachedThreadPool: 10
最低线程限制为1个,处理实际业务与Socket(JBOSS、Netty、bootstrap)等
newScheduledThreadPool: 11
最低线程数在1~3之间
URL: 定期重连,清除过期者,心跳,延期等。调用BO失败后定期重试
文件: 更新定时统计/图表/日志, 检查文件更新(可用于热部署),
newFixedThreadPool: 1
线程限制固定为1个,用于文件缓存写入