开源项目中的线程池

下面介绍几个开源项目中如何配置的线程池

JDK自带Executors里面几个线程池

这几个就不介绍了,可以直接看JavaDoc

corePoolSizemaximumPoolSizekeepAliveBlockingQueue
newCachedThreadPool0Integer.MAX_VALUE60sSynchronousQueue
newFixedThreadPool${nThreads}${nThreads}0sLinkedBlockingQueue
newSingleThreadExecutor110sLinkedBlockingQueue

初始线程数并不是越大越好,它最终取决于代码中可以并行运算的比例。当并发线程太多时,系统整体性能反而会下降,因为系统把很多时间花在了线程调度上,详见阿姆达尔定律
AmdahlsLaw.svg.png

RxJava中几个线程池

在RxJava2中主要有如下4个线程池(划掉的不进行分析)

  • SingleScheduler
  • ComputationScheduler
  • IoScheduler
  • NewThreadScheduler

本文只专注于它的“执行”过程,而不是调度过程,最重要的部分在Worker中,因此可以这样构造代码研究

// Groovy代码
5.times {
    Scheduler.Worker worker = Schedulers.io().createWorker();
    worker.schedule(new Runnable() {
        @Override
        void run() {
            println "inner = " + Thread.currentThread().getStackTrace().join('\n')
            println "inner = " + Thread.currentThread().getName()
        }
    });
}
Thread.currentThread().join()

下面假设你已经把上面流程全部调通。以下为我的调试结果

  • 线程池: 一群只会干活的奋斗者,所有线程池构造都是newScheduledThreadPool(1, factory)
  • Worker: Worker是对线程池的包装,类似于基层领导,一个Worker被分配一个线程池,当然这个线程池可能身兼数职。
  • EventLoopWorker: 对线程池外部状态进行维护,可以看作某些公司的HR

Worker状态

创建

调用createWorker创建一个Worker

//IoScheduler
public Worker createWorker() {
    //CachedWorkerPool: 变长队列,可以创建与缓存无数个线程池
    return new EventLoopWorker(pool.get());
}

//ComputationScheduler
public Worker createWorker() {
    //FixedSchedulerPool: 用数组pool[CPUSize]实现,通过取余调用(实现代码为`pool[i++%4]`)分配任务,不能创建更多的线程池
    return new EventLoopWorker(pool.get().getEventLoop());
}
干活

work执行schedule,最终调用线程池的executor.submit(),并返回一个包装后的Future,这里没什么可讲的。

任务结束

onComplete后RxJava框架调用FlowableSubscribeOndispose方法,最终主动调用worker.dispose()

//IoScheduler.EventLoopWorker
@Override
public void dispose() {
  if (once.compareAndSet(false, true)) {
    // 让线程池执行cancel命令,并清空所有Future
    tasks.dispose();
    // 设置Worker超时,加入“短命单”,60s后定时清理一次
    // 这里原理是构造了每隔60s自动执行的`evictorService`,它是一个清洁线程池,定期执行`evictExpiredWorkers`方法,移除队列`expiringWorkerQueue`中超时的Worker
    // releasing the pool<threadWorker> should be the last action
    pool.release(threadWorker);
  }
}

 //ComputationScheduler.EventLoopWorker
 @Override
 public void dispose() {
   if (!disposed) {
     disposed = true;
     // 斩立决,让线程池执行cancel命令,并清空所有Future
     both.dispose();
   }
 }

IoScheduler的模式正是某些私营公司在工作量大时海量招聘时堆人,加班奋斗后变成甘蔗渣,最后进入短名单被HR定期优化的流程。

ComputationScheduler的模式是某些公司工作量大时加班再多也不招人,加班奋斗后变成甘蔗渣,最后直接被通知斩立决马上走人的流程。

通过断点也可以发现,由于IoScheduler是在static初始化并启动,因此无论是否使用IoScheduler,清洁工线程池evictorService将自动启动且永远不会自动关闭,除非将IoScheduler进行主动shutdown。发现了什么了吗,执行者只是可替代的资源,只有Pool才是核心竞争力!

Dubbo中的线程池

Dubbo是一款事务框架,这种纯框架中使用的线程池一般可以作为各种场景负载下的教科书,我们将Executors作为关键字进行统计

打开IDEA,运行cmd+shift+F,搜索Executors.即可

如果不使用IDEA,可以通过Shell的grep暴力搜索

grep 'Executors.' * -R|grep -v test|grep -v import|grep -v class

经过统计,最终如下

newCachedThreadPool: 10
最低线程限制为1个,处理实际业务与Socket(JBOSS、Netty、bootstrap)等

newScheduledThreadPool: 11
最低线程数在1~3之间

URL: 定期重连,清除过期者,心跳,延期等。调用BO失败后定期重试

文件: 更新定时统计/图表/日志, 检查文件更新(可用于热部署),

newFixedThreadPool: 1
线程限制固定为1个,用于文件缓存写入