CountDownLatch

英文名词

  • predecessor| ˈpriːdɪsɛsə |: 前任者; 先輩. eg: the Prime Minister learned from his predecessor's mistakes.
  • latch: 门闩,CountDownLatch表示自动倒计时关上的门锁
  • elide | ɪˈlʌɪd |: 省略. eg: the null check may be elided.

应用

CountDownLatch一般用于异步转同步操作,比如SpringCloud/ElasticSearch中在主线程实现阻塞等待Response

比如SpringCloud中的Feign框架利用Rxjava等待异步网络请求,并转为同步操作

return command.submit(request)
    // 类似于前端的await,在Java中用CountDownLatch实现
    .toBlocking()
    .single();

再比如ElasticSearch客户端中基于Netty异步请求,同样也是基于await实现的

CountDownLatch是并发编程中通过CAS队列包装实现的锁,先举个例子(Groovy代码)

def s = Executors.newCachedThreadPool()
def latch = new CountDownLatch(3)
3.times{
    s.execute(new Runnable() {
        @Override
        void run() {
            try {
                println "start ${it}"
                TimeUnit.SECONDS.sleep(2)
                println "end ${it}"
            }catch (Exception e){
                e.printStackTrace()
            }finally{
                latch.countDown()
            }
        }
    })
}
latch.await()
println "latch complete"

通过执行上面的Groovy代码段可以发现,无论executor线程池中怎么运行,只要不阻塞死,主线程一定会在所有线程池计数归零时才开始运行。

CountDownLatch的应用

App启动页

在某些App启动时,既要加载广告,又要加载首屏数据,涉及到两个请求,这时可以让两个线程同时跑,主线程

流式/链式调用回掉

WatchDog

CountDownLatch的源码分析

CountDownLatch代码很少,去掉注释只有100多行。它本质上是对队列AbstractQueuedSynchronizer的包装,底层通过CAS实现了原子性,其中CAS之前文章已经写过了,本质是自旋与CPU硬件实现。

AbstractQueuedSynchronizer分析

private void doAcquireSharedInterruptibly(int arg)
  throws InterruptedException {
  // 此处通过包装CAS实现原子性
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

Unsafe.compareAndSwapObject在JVM侧的C实现

其实这部分以前已经分析过,下面的代码是JamVM的实现,本质上是CPU硬件帮你把事情搞定了

//  unsafe.compareAndSwapObject(node, nextOffset, expect, update)
uintptr_t *compareAndSwapObject(Class *class, MethodBlock *mb,
                                uintptr_t *ostack) {

    long long offset = *((long long *)&ostack[2]);
    uintptr_t *addr = (uintptr_t*)((char *)ostack[1] + offset);
    uintptr_t expect = ostack[4];
    uintptr_t update = ostack[5];
    int result;
	// 此部分通过 __asm__ 汇编代码段实现
    result = COMPARE_AND_SWAP(addr, expect, update);
    *ostack++ = result;
    return ostack;
}