一. CountDownLatch 减少计数器
- CountDownLatch 减少计数器,有点像倒计,计数器,当减少为0时线程执行
- 作用是允许1或者多个线程,等待另外N个线程完成某件事情之后,这1个或者多个线程才能执行。CyclicBarrier 是N个线程相互等待,任何一个线程完成任务之前,所有的线程必须等待。
- CountDownLatch 计数器是一次性的,无法被重置的,而CyclicBarrier的计数器在调用reset方法之后,还可以重新使用,因此被称为循环的barrier
public class T1 {
public static void main(String[] args) throws InterruptedException {
//1.创建一个 CountDownLatch 初始化计数为3
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(()->{
//2.计数减1
countDownLatch.countDown();
System.out.println("线程执行计数减1");
},String.valueOf(i)).start();
}
//3.判断CountDownLatch中的计数是否为0,如果不为0会阻塞,当为0时,会在此位置唤醒继续向下执行
countDownLatch.await();
//4.当CountDownLatch中的计数减为0时才会执行
System.out.println("计数为0");
}
}
二. CyclicBarrier 循环栅栏
- CyclicBarrier 循环栅栏与 CountDownLatch相反的计数器
public static void main(String[] args){
//1.创建一个实际工作线程,提供工作的run方法
Thread myThread = new Thread(() -> {
System.out.println("实际工作线程工作---->");
}, "实际工作线程");
//2.创建一个 CyclicBarrier 需要两个参数
//第一个:指定计数器值
//第二个:需要一个 Runnable 类型变量,当计数器值到达时自动执行 Runnable 工作方法
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, myThread);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println("计数器线程执行");
//3.当多线程执行时,会判断是否达到计数器设置的值
//如果没达到会阻塞,并且每执行一次累计加1,当达到时
//自动执行CyclicBarrier 构造器中传入的线程myThread工作方法
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
三. Semaphore 信号灯
- Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数(当运行的并发访问数为1时,就有点像 synchronized)
- Semaphore的主要方法摘要:
- void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
- void release():释放一个许可,将其返回给信号量。
- int availablePermits():返回此信号量中当前可用的许可数。
- boolean hasQueuedThreads():查询是否有线程正在等待获取。
- 代码示例
public static void main(String[] args) throws InterruptedException {
//1.创建一个 Semaphore 初始化资源运行的最大并发量为3
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
//2.当前线程执行抢占资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢占到了资源");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//3.线程执行完毕,释放资源
System.out.println(Thread.currentThread().getName()+"释放资源");
semaphore.release();
}
},String.valueOf(i)).start();
}
}
四. CountDownLatch 底层实现
- 在使用CountDownLatch时首先要创建一个CountDownLatch对象,并初始化计数,了解CountDownLatch底层要在三个部分,调用构造器创建, 调用await()方法判断计数器是否为0,获取锁,如果获取成功则执行,否则阻塞,调用countDown()方法计数减一
- 查看ConutDownLatch构造器,与内部结构会发现CountDownLatch中有一个继承了AbstractQueuedSynchronizer 内部类Sync,整个CountDownLatch是基于AQS实现的,在AQS中有一个被volatile修饰的int类型属性state,通过通过state进行计数
- 唤醒与阻塞底层通过Unsafe 的 unpark()唤醒,park()阻塞实现的
//构造器,调用构造器创建时,内部会判断计数器是否大于0,会创建一个Sync对象(AQS)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//阻塞方法,会执行AQS中的acquireSharedInterruptibly方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//1.该方法其实调用AQS中的releaseShared(1)释放共享锁方法。
public void countDown() {
sync.releaseShared(1);
}
await() 判断state,也可以简单理解为计数,如果为0获取锁成功,当前线程执行,否则获取锁失败,阻塞当前线程
- 调用await()方法,内部调用的是AQS的acquireSharedInterruptibly()方法,执行tryAcquireShared(arg)获取AQS中state的值是否为0,如果是0说明没没有被其它线程获取大锁,直接放行,否则执行doAcquireSharedInterruptibly()
//1.获取共享锁
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//判断线程是否为中断状态,如果是抛出interruptedException
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取共享锁,尝试成功就返回,否则调用doAcquireSharedInterruptibly方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//2.尝试获取共享锁,重写AQS里面的方法
protected int tryAcquireShared(int acquires) {
//锁状态 == 0,表示所没有被任何线程所获取,即是可获取的状态,否则锁是不可获取的状态
return (getState() == 0) ? 1 : -1;
}
- doAcquireSharedInterruptibly方法会使得当前线程一直等待,直到当前线程获取到锁(或被中断)才返回,该方法中会创建一个存储当前线程的Node节点,并将节点添加到CLH队列末尾,
- 判断当前节点的上一个节点在AQS中是否是AQS的头节点,如果是,说明正在执行的线程如果执行完毕,是否锁后,下一个执行的线程就是当前节点持有的线程,会调用tryAcquireShared(arg)方法再次尝试获取锁,减少不必要的线程阻塞与唤醒
- 如果获取锁失败,或者当前node节点的上一个节点不是AQS的头节点,执行shouldParkAfterFailedAcquire(p, node)方法,根据当前当前节点的上一个节点状态,维护自身的状态
//3.doAcquireSharedInterruptibly方法会使得当前线程一直等待,直到当前线程获取到锁(或被中断)才返回
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//创建“当前线程”的Node节点,且node中记录的锁是“共享锁”类型,并将节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前继节点,如果前继节点是等待锁队列的表头,则尝试获取共享锁
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//前继节点不是表头,当前线程一直等待,直到获取到锁
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//4.通过以下规则,判断“当前线程”是否需要被阻塞,此处规则中的状态是AQS中的Node节点的状态
//实际该方法就是通过前继节点的状态维护自身的状态
//规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
//规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
//规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的状态
int ws = pred.waitStatus;
// 如果前驱节点是SIGNAL状态,则意味着当前线程需要unpark唤醒,此时返回true
if (ws == Node.SIGNAL)
return true;
// 如果前继节点是取消的状态,则设置当前节点的“当前前继节点为”原节点的前继节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
countDown()
//2.目的是让当前线程释放它所持有的共享锁,它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//3.tryReleaseShared()在CountDownLatch.java中被重写,释放共享锁,将锁计数器-1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 获取“锁计数器”的状态
int c = getState();
if (c == 0)
return false;
// “锁计数器”-1
int nextc = c-1;
// 通过CAS函数进行赋值。
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}