CountDownLatch是什么?

CountDownLatch 是一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
通过指定 count 来初始化 CountDownLatch,
await()方法将阻塞直到线程使用 countDown() 方法减少count 直至为0时释放 所有等待的线程,以及被阻塞的方法将被立即返回。
另外, CountDownLatch 的 count 无法被重置。 如果需要重置 count, 可使用 CyclicBarrier。

CountDownLatch 使用场景

场景一: 某个线程等待N个线程执行完成后执行
  • count=N:一个线程等待 其余N个线程完成操作,或者一个线程完成N次操作。
  1. CountDownLatch 初始值为N
  2. 等待的线程执行 await 操作,使得当前线程被阻塞(使用LockSupport阻塞) [这里也可以在main中阻塞]
  3. 每个线程执行完毕后,调用countDown()
  4. 当计数器值变为0,被阻塞的县城会被唤醒,执行待执行的动作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.concurrent.CountDownLatch;
public class CountDownLatch3 {
public static void main(String[] args) throws InterruptedException {
int count =3;
CountDownLatch countDownLatch = new CountDownLatch(count);
for (int i=0; i< count; i++){
new Thread(()->{

System.out.println(Thread.currentThread().getName() + "执行任务");
countDownLatch.countDown();
}).start();
}
System.out.println(Thread.currentThread().getName() + "阻塞等待.");
countDownLatch.await();
}
}
场景二: 多个线程等待某个时刻同时执行
  • count=1: 所有线程调用 await()方法等待直到有另外的一个线程调用countDown()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo2 {
public static void main(String[] args) throws InterruptedException {
int athleteCount = 3;
CountDownLatch athletes = new CountDownLatch(athleteCount);
CountDownLatch supervisor = new CountDownLatch(1);

for (int i=0; i< athleteCount; i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "等待裁判员鸣枪起跑");
try {
supervisor.await();
System.out.println(Thread.currentThread().getName() + "收到裁判员起跑命令");
System.out.println(Thread.currentThread().getName() + "起跑");
athletes.countDown();
System.out.println(Thread.currentThread().getName() + "完成比赛");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
// 发出起跑命令
supervisor.countDown();
// 等待所有 运动员完成比赛
athletes.await();
}
}

CountDownLatch 原理解析

CountDownLatch 提供了如下方法:

  • await(): 使当前线程等待 count=0 [volatile变量],或者当前线程中断。
  • await(long, TimeUnit):
  • countDown(): 减少latch计数值,当计数值为0时释放所有等待的线程。
  • getCount(): 获取当前的计数值

其中 await() 和 countDown() 是最重要的方法,来看其实现

CountDownLatch.await()

1
2
3
4
5
6
7
8
9
10
11
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

acquireSharedInterruptibly先检查中断状态,然后调用tryAcquireShared如果成功,则成功获取到共享锁。如果当前同步计数值 大于 0,则当前线程入队,重复调用 tryAcquireShared直至成功或者中断。
其中 tryAcquireShared 的源码如下:

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

尝试以共享模式获取。此方法应查询对象的状态是否允许在共享模式下获取该对象,如果允许则进行获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

这段代码的逻辑如下:

  • 为当前线程创造 node 节点并入队
  • 死循环检查,当前节点是否为CLH队列中的第一个节点,如果是,则调用tryAcquireShared判断当前的同步状态计数值是否为0,如果是则手动将当前node的next置为null以辅助GC。
  • 调用 shouldParkAfterFailedAcquire 检查并更新无法获取的节点的状态。
  • 调用 parkAndCheckInterrupt park 并检查当前线程是否中断。
  • 如果 有抛出异常,则 调用 cancelAcquire

展开来看:

  1. addWaiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
Node node = new Node(mode);

for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
  1. setHeadAndPropagate
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
    * Try to signal next queued node if:
    * Propagation was indicated by caller,
    * or was recorded (as h.waitStatus either before
    * or after setHead) by a previous operation
    * (note: this uses sign-check of waitStatus because
    * PROPAGATE status may transition to SIGNAL.)
    * and
    * The next node is waiting in shared mode,
    * or we don't know, because it appears null
    *
    * The conservatism in both of these checks may cause
    * unnecessary wake-ups, but only when there are multiple
    * racing acquires/releases, so most need signals now or soon
    * anyway.
    */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
    doReleaseShared();
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}

CountDownLatch 内部使用 Sync 类进行同步控制,其内部使用AQS状态表示count。 CountDownLatch使用的是共享锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

CountDownLatch 内部定义了 静态内部类SyncSync类中提供了三个方法,分别是:

  • getCount():
  • tryAcquireShared(int acquires)
  • tryReleaseShared(int releases)

如上三个方法父类java.util.concurrent.locks.AbstractQueuedSynchronizer定义,来看实现:

getState

getState 用以获取同步状态

1
int getCount() { return getState();}

getState返回当前的同步状态,该方法由父类AbstractQueuedSynchronizer定义,且由final修饰,子类不能修改。

1
2
3
protected final int getState() {
return state;
}

state 的定义如下:

1
private volatile int state;

tryAcquireShared

tryAcquireShared:在共享模式下尝试获取锁,在获取锁之前需要先检查锁是否被占用,如果未被占用,则使用CAS修改同步状态。
在尝试获取锁前调用,如果失败则将当前线程放入等待队列中,直到有锁被释放为止。

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

tryReleaseShared

1
2
3
4
5
6
7
8
9
10
11
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

其中compareAndSetStateCAS原子操作,如果给定的值和当前同步状态值一致,则将同步状态修改为待更新的值。

疑惑

VarHandle是什么?
waitStatus = 0 的含义?

1
2
3
4
5
6
7
8
9
10
11
/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate.
*/
static final int PROPAGATE = -3;

引用

1. Java AQS 核心数据结构-CLH 锁
2. AQS详解
3. CountDownLatch的应用场景及最佳实践
4. juc全局观