CountDownLatch
英文名词
- predecessor| ˈpriːdɪsɛsə |: 前任者; 先輩. eg: the Prime Minister learned from his predecessor's mistakes.
- latch: 门闩,CountDownLatch表示自动倒计时关上的门锁
- elide | ɪˈlʌɪd |: 省略. eg: the null check may be elided.
应用
CountDownLatch一般用于异步转同步操作,比如SpringCloud/ElasticSearch中在主线程实现阻塞等待Response
比如SpringCloud中的Feign框架利用Rxjava等待异步网络请求,并转为同步操作
return command.submit(request)
// 类似于前端的await,在Java中用CountDownLatch实现
.toBlocking()
.single();
再比如ElasticSearch客户端中基于Netty异步请求,同样也是基于await实现的
CountDownLatch是并发编程中通过CAS队列包装实现的锁,先举个例子(Groovy代码)
def s = Executors.newCachedThreadPool()
def latch = new CountDownLatch(3)
3.times{
s.execute(new Runnable() {
@Override
void run() {
try {
println "start ${it}"
TimeUnit.SECONDS.sleep(2)
println "end ${it}"
}catch (Exception e){
e.printStackTrace()
}finally{
latch.countDown()
}
}
})
}
latch.await()
println "latch complete"
通过执行上面的Groovy代码段可以发现,无论executor线程池中怎么运行,只要不阻塞死,主线程一定会在所有线程池计数归零时才开始运行。
CountDownLatch的应用
App启动页
在某些App启动时,既要加载广告,又要加载首屏数据,涉及到两个请求,这时可以让两个线程同时跑,主线程
流式/链式调用回掉
WatchDog
CountDownLatch的源码分析
CountDownLatch代码很少,去掉注释只有100多行。它本质上是对队列AbstractQueuedSynchronizer
的包装,底层通过CAS实现了原子性,其中CAS之前文章已经写过了,本质是自旋与CPU硬件实现。
AbstractQueuedSynchronizer分析
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 此处通过包装CAS实现原子性
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Unsafe.compareAndSwapObject在JVM侧的C实现
其实这部分以前已经分析过,下面的代码是JamVM的实现,本质上是CPU硬件帮你把事情搞定了
// unsafe.compareAndSwapObject(node, nextOffset, expect, update)
uintptr_t *compareAndSwapObject(Class *class, MethodBlock *mb,
uintptr_t *ostack) {
long long offset = *((long long *)&ostack[2]);
uintptr_t *addr = (uintptr_t*)((char *)ostack[1] + offset);
uintptr_t expect = ostack[4];
uintptr_t update = ostack[5];
int result;
// 此部分通过 __asm__ 汇编代码段实现
result = COMPARE_AND_SWAP(addr, expect, update);
*ostack++ = result;
return ostack;
}