一、概述
前面我们讲解了ReentrantLock,CountDownLatch,Semaphore的源码,他们都是由AQS来实现的,而CyclicBarrier则是通过ReentrantLock+Condition实现的
CyclicBarrier即为栏栅,比如短跑比赛,5人一组,必须5人到齐准备好了了,才开始跑,5人跑走以后,后面第二组同样等到齐准备好了,继续开始跑。我们就可以把人比作线程。
二、CyclicBarrier框架
private static class Generation {
boolean broken = false;
}
/** 定义一个可重入锁*/
private final ReentrantLock lock = new ReentrantLock();
/** 定义一个 Condition */
private final Condition trip = lock.newCondition();
/** 定义栏栅parties */
private final int parties;
/** 定义执行线程 */
private final Runnable barrierCommand;
/** 定义generation */
private Generation generation = new Generation();
/** 定义count */
private int count;
- public int await() //调用的线程被挂起,知道满足珊栏的条件
- public int await(long timeout, TimeUnit unit)
//调用的线程被挂起,知道满足珊栏的条件或者时间达到传
入的值
三、源码解读
定义一个栏栅为3,A,B,C三个线程分别去执行,A,B线程先准备完成,C线程10s后准备完成
最后要等C线程准备完成后才执行
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
new Thread(()->{
try {
System.out.println("A 准备完成 ----------");
cyclicBarrier.await();
System.out.println("A 执行 ----------");
}catch (Exception e){
}
},"A").start();
new Thread(()->{
try {
System.out.println("B 准备完成 ----------");
cyclicBarrier.await();
System.out.println("B 执行 ----------");
}catch (Exception e){
}
},"B").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(10);
System.out.println("C 准备完成 ----------");
cyclicBarrier.await();
System.out.println("C 执行 ----------");
}catch (Exception e){
}
},"C").start();
分析源码,进入await();方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
进入dowait(false, 0L);
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//加锁操作,保证线程安全
try {
final Generation g = generation;
if (g.broken)//Generation维护了一个broken变量,表示是否打破屏障。如果屏障已经打破或线程已经停止则都会抛出异常。
throw new BrokenBarrierException();
if (Thread.interrupted()) {//线程中断
breakBarrier();//唤醒其他线程
throw new InterruptedException();//抛出异常
}
int index = --count;//计数-1并获取
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;//构造方法传入的一个Runnable
if (command != null)
command.run();//若有传入,则唤醒所有线程前先执行指定的任务
ranAction = true;
nextGeneration();//到达栏栅,count=0,唤醒其他线程
return 0;
} finally {
if (!ranAction)
breakBarrier();//唤醒其他线程
}
}
for (;;) {
try {
if (!timed)//根据传入的参数来决定是定时等待还是非定时等待
trip.await();//未到达栏栅,线程挂起,释放锁
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)//如果线程因为打翻栅栏操作而被唤醒则抛出异常
throw new BrokenBarrierException();
if (g != generation) //如果线程因为换代操作而被唤醒则返回计数器的值
return index;
if (timed && nanos <= 0L) {//如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
首先A线程 进入方法,加锁,判断是否打破屏障,是否中断等(特殊情况),若都没有则判断count的值
count初始化=parties=2,index=–count=2 不为0则进入下一步自旋,调用trip.await();线程挂起,释放锁。接下来B线程 进入方法,加锁,判断是否打破屏障,是否中断等(特殊情况),若都没有则判断count的值
count=1,index=–count=1 不为0则进入下一步自旋,调用trip.await();线程挂起,释放锁
10s后C线程进入 count=0,index=–count=0,进入nextGeneration();方法
private void nextGeneration() {
// 唤醒其他挂起的线程
trip.signalAll();
// 设置count为初始值,便于重复使用
count = parties;
generation = new Generation();// 重新维护Generation
}
此时ABC三个线程都可以执行了
再来说说trip.await();方法,属于Condition接口,在AQS中实现的
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//将当前线程包装下后
//添加到Condition自己维护的一个链表中。
int savedState = fullyRelease(node);//释放当前线程占有的锁
//调用await前,当前线程是占有锁的
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//释放完毕后,遍历AQS的队列,看当前节点是否在队列中
//不在 说明它还没有竞争锁的资格,所以继续将自己沉睡。
//直到它被加入到队列中,聪明的你可能猜到了,
//没有错,在singal的时候加入不就可以了?
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
核心还是LockSupport.park(this);挂起线程
trip.signalAll();也在AQS中实现
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;//firstWaiter为condition自己维护的一个链表的头结点,
//取出第一个节点后开始唤醒操作
if (first != null)
doSignalAll(first);
}
四、总结
CyclicBarrier的核心就是await()方法等待后,执行nextGeneration()唤醒所有线程
CyclicBarrier底层基于ReentrantLock和Condition实现,如果count不为0,则调用Condition的await方法
让线程等待执行,当count为0时,调用Condition的singleAll唤醒全部等待的线程执行