juc之CountDownLatch
CountDownLatch是什么?
CountDownLatch 是一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
通过指定 count 来初始化 CountDownLatch,
await()方法将阻塞直到线程使用 countDown() 方法减少count 直至为0时释放 所有等待的线程,以及被阻塞的方法将被立即返回。
另外, CountDownLatch 的 count 无法被重置。 如果需要重置 count, 可使用 CyclicBarrier。
CountDownLatch 使用场景
场景一: 某个线程等待N个线程执行完成后执行
- count=N:一个线程等待 其余N个线程完成操作,或者一个线程完成N次操作。
- CountDownLatch 初始值为N
- 等待的线程执行 await 操作,使得当前线程被阻塞(使用LockSupport阻塞) [这里也可以在main中阻塞]
- 每个线程执行完毕后,调用countDown()
- 当计数器值变为0,被阻塞的县城会被唤醒,执行待执行的动作。
1 | import java.util.concurrent.CountDownLatch; |
场景二: 多个线程等待某个时刻同时执行
- count=1: 所有线程调用 await()方法等待直到有另外的一个线程调用countDown()方法
1 | import java.util.concurrent.CountDownLatch; |
CountDownLatch 原理解析
CountDownLatch 提供了如下方法:
- await(): 使当前线程等待 count=0 [volatile变量],或者当前线程中断。
- await(long, TimeUnit):
- countDown(): 减少latch计数值,当计数值为0时释放所有等待的线程。
- getCount(): 获取当前的计数值
其中 await() 和 countDown() 是最重要的方法,来看其实现
CountDownLatch.await()
1 | public void await() throws InterruptedException { |
acquireSharedInterruptibly
先检查中断状态,然后调用tryAcquireShared
如果成功,则成功获取到共享锁。如果当前同步计数值 大于 0,则当前线程入队,重复调用 tryAcquireShared
直至成功或者中断。
其中 tryAcquireShared
的源码如下:
1 | protected int tryAcquireShared(int acquires) { |
尝试以共享模式获取。此方法应查询对象的状态是否允许在共享模式下获取该对象,如果允许则进行获取。
1 | private void doAcquireSharedInterruptibly(int arg) |
这段代码的逻辑如下:
- 为当前线程创造 node 节点并入队
- 死循环检查,当前节点是否为CLH队列中的第一个节点,如果是,则调用tryAcquireShared判断当前的同步状态计数值是否为0,如果是则手动将当前node的next置为null以辅助GC。
- 调用
shouldParkAfterFailedAcquire
检查并更新无法获取的节点的状态。 - 调用
parkAndCheckInterrupt
park 并检查当前线程是否中断。 - 如果 有抛出异常,则 调用
cancelAcquire
展开来看:
- addWaiter
1 | private Node addWaiter(Node mode) { |
- setHeadAndPropagate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
CountDownLatch 内部使用 Sync
类进行同步控制,其内部使用AQS
状态表示count。 CountDownLatch使用的是共享锁。
1 | private static final class Sync extends AbstractQueuedSynchronizer { |
CountDownLatch 内部定义了 静态内部类Sync
, Sync
类中提供了三个方法,分别是:
- getCount():
- tryAcquireShared(int acquires)
- tryReleaseShared(int releases)
如上三个方法父类java.util.concurrent.locks.AbstractQueuedSynchronizer
定义,来看实现:
getState
getState
用以获取同步状态
1 | int getCount() { return getState();} |
getState
返回当前的同步状态,该方法由父类AbstractQueuedSynchronizer
定义,且由final
修饰,子类不能修改。
1 | protected final int getState() { |
state 的定义如下:
1 | private volatile int state; |
tryAcquireShared
tryAcquireShared
:在共享模式下尝试获取锁,在获取锁之前需要先检查锁是否被占用,如果未被占用,则使用CAS
修改同步状态。
在尝试获取锁前调用,如果失败则将当前线程放入等待队列中,直到有锁被释放为止。
1 | protected int tryAcquireShared(int acquires) { |
tryReleaseShared
1 | protected boolean tryReleaseShared(int releases) { |
其中compareAndSetState
是CAS
原子操作,如果给定的值和当前同步状态值一致,则将同步状态修改为待更新的值。
疑惑
VarHandle是什么?
waitStatus = 0 的含义?
1 | /** waitStatus value to indicate thread has cancelled. */ |
引用
1. Java AQS 核心数据结构-CLH 锁
2. AQS详解
3. CountDownLatch的应用场景及最佳实践
4. juc全局观