AQS
什么是AQS
AQS(AbstractQueuedSynchronizer)即抽象队列同步器,是一套可以实现同步锁机制的框架,是许多JUC内同步框架的基石。AQS通过一个FIFO的队列维护线程同步状态,实现类只需要继承该类,并重写指定方法既可以实现一套线程同步机制。
AQS原理
简单的说,AQS维护了一个volatile int state
变量和CLH(三个人名字的缩写)
双向队列,和一个ConditionObject(后续说明)组成
- state:线程通过对state的修改来获取锁,如果修改失败将会被包装为节点挂载到队列中,等待持有锁的线程释放锁并唤醒队列中的节点
private volatile int state;
AQS中提供了获取和设置state的实现:
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
waitStatus有五种取值:
CANCELLED = 1
。节点引用线程由于等待超时或被打断时的状态。SIGNAL = -1
。后继节点线程需要被唤醒时的当前节点状态。当队列中加入的后继节点被挂起(block)
时,其前驱节点会被设置为SIGNAL
状态,表示该节点需要被唤醒。CONDITION = -2
。当节点线程进入condition
队列时的状态。(见ConditionObject
)PROPAGATE = -3
。仅在释放共享锁releaseShared
时对头节点使用。(见共享锁分析)0
。节点初始化时的状态。
获取锁失败的线程会被包装为节点,加入CLH双向队列中,结构如下:
AQS提供的两种锁
在此之前,我们先举一个例子:
- Mutex类继承AQS,实现其中方法,用来做获取锁和释放锁的操作
@Slf4j
public class Mutex extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0,1);
}
@Override
protected boolean tryRelease(int arg) {
return compareAndSetState(1,0);
}
@Override
protected boolean tryRelease(int arg) {
return compareAndSetState(1,0);
}
@Override
protected int tryAcquireShared(int arg) {
return super.tryAcquireShared(arg);
}
}
其中compareAndSetState()
方法:
是一个CAS操作来确定更改值的
独占锁
我们Mutex类中写一个main方法,用来测试独占锁
public static void main(String[] args) {
Mutex mutex = new Mutex();
Thread t1 = new Thread(()->{
log.debug("t1尝试获取锁");
mutex.acquire(1);
log.debug("t1获取锁成功");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("t1准备释放锁");
mutex.release(1);
log.debug("t1锁已释放");
},"t1");
Thread t2 = new Thread(()->{
log.debug("t2尝试获取锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
mutex.acquire(1);
log.debug("t2获取锁成功");
mutex.release(1);
log.debug("t2锁已释放");
},"t2");
t1.start();
t2.start();
}
运行结果:
17:52:42.623 [t1] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t1尝试获取锁
17:52:42.623 [t2] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t2尝试获取锁
17:52:42.626 [t1] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t1获取锁成功
17:52:45.632 [t1] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t1准备释放锁
17:52:45.632 [t1] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t1锁已释放
17:52:45.632 [t2] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t2获取锁成功
17:52:45.632 [t2] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t2锁已释放
首先可以肯定是的,t1一定先获取锁,并且在3s后t2才能获取锁
AQS中提供了获取独占锁的方法acquire()
和释放锁的方法release()
加锁方法acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
整个方法的执行流程为:
流程分析:
- tryAcquire() 自己实现的方法
- addWaiter():向队列中添加当前节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter
将线程包装为独占节点,尾插式加入到队列中,如队列为空,则会添加一个空的头节点,内容也为null
- acquireQueued():将队列中的每个node尝试去获取锁,挂起没有获得到锁的节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
意思是,当我们节点进来时,先看看当前队列有没有节点在处理,如果有就去排队,然后等待(被挂起,等快到自己的是被唤醒),如果你不是排队的第一节点,那么直接挂起。当被挂起时,他的waitStatus
被设置为SIGNAL(-1)
,表示需要唤醒,随后通过park进入阻塞状态
另外,源码中有两段help GC,这里设置为空的原因是,头节点不参与排队,因为他已经获取到了同步状态,现在是需要进行业务逻辑操作的,而在业务逻辑操作完之后,该头结点肯定需要进行垃圾回收,防止空间浪费,这里就涉及到GC Root,如果还有对象引用的话,垃圾回收器是不会回收他的,所以要将他的属性置空,方便垃圾回收
当每次线程调用时都会先调用tryAcquire
,失败后才会挂载到队列,因此acquire
实现默认为非公平锁
释放锁的方法release()
释放锁的过程比较简单
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
从头结点开始唤醒后继节点,
共享锁
加锁方法acquireShared
加锁方法和独占锁类似,
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
区别是,独占锁返回的是bool,共享锁是int
- doAcquireShared:将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
因为这是个自旋,所以会传递唤醒后续的阻塞节点
我们在源码内说的,将setHeadAndPropagate
单独摘出来说
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
源码内的注释写的很详细
因为在这里面,是获取到node.next的节点,所以实际是共享锁的传播解锁(如果条件合适)
解锁方法releaseShared
共享锁的解锁也和上面的类似,不过独占锁的解锁和加锁都是bool类型,共享锁只有加锁时int类型,更方便控制共享锁的数量吧
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
解锁和独占锁的解锁也类似,释放资源后唤醒后继,
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)