文章目录
- AQS介绍
-
- AQS源码结构
- CLH同步队列
- state同步状态
- 独占式同步状态获取与释放
-
- 共享式同步状态获取与释放
看了很多帖子,原理说啥的都有,算了还是自己整理吧,既然没个统一,那我就自己整理一下🤣🤣🤣,刚升博客专家,我说的就对 🧐🧐🧐
AQS介绍
AQS概念
AbstractQueuedSynchronized(AQS),是抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架**,许多同步类实现都依赖于它,例如常用的ReentrantLock/Semaphore/CountDownLatch等
AQS是一个用来构建锁和其他同步组件的基础框架,使用AQS可以简单且高效地构造出应用广泛的同步器,它提供了一个FIFO队列(先进先出队列),可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件。
模型整体的工作流程如下所示:
AQS模式分类
AQS支持独占锁(exclusive)和共享锁(share)两种模式。
但是无论是独占锁还是共享锁,本质上都是对AQS类内部的一个变量state的获取。
这里的 state是一个原子的int变量,用来表示锁状态、资源数等。
变量state源码示例如下(不急,后面会讲到):
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
AQS核心思想
AQS的核心思想也很简单:
这个机制AQS是用CLH同步队列锁实现的,即将暂时获取不到锁的线程加入到等待队列中。
AQS源码结构
CLH同步队列
我们的 AQS底层的数据结构采用CLH队列,AQS依赖它来完成同步状态的管理,CLH队列是一个FIFO双向队列,即不存在队列的实例,仅存在节点之间的关联关系。
原理如下:AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。当共享资源被某个线程占有(即当前线程获取同步状态失败),AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
如下结构图就是一个CLH同步队列:
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next)
而在源码的注释中,也能看到这样的结构介绍:
翻译如下:
/**
* 等待队列节点 class.
*
* <p>等待队列是“CLH”的变体(Craig、Landin和
* Hagersten)锁定队列。CLH锁通常用于
* 自旋锁。
* ...........
* <p>要排队进入CLH锁,您可以将其作为新的
* 尾部。要退出队列,只需设置head字段。
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
* ..............
结点Node源码如下:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
其中首先节点的类型是AQS的静态内部类Node,Node节点的状态有如下四种,AQS中关于Node状态的源码如下
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
-
CANCELLED = 1:表示当前节点从同步队列中取消,即当前线程被取消
-
SIGNAL = -1:表示后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行
-
CONDITION = -2:表示当前节点在等待condition,也就是在condition queue中
-
PROPAGATE = -3:表示下一次共享式同步状态获取将会无条件传播下去
state同步状态
AQS中的共享资源是使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。
AQS使用CAS对该同步状态进行原子操作实现对其值的修改。状态信息通过procted类型的getState,setState,compareAndSetState进行操作。
AQS中同步状态获取的源码如下:
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
因为 AQS 也只是一个抽象类,这个state在他的子类中,可以用于表示任意状态,例如:
- ReentrantLock用它来表示锁的持有者线程已经重复获取该锁的次数,而对于非锁的持有者线程来说,如果state大于0,意味着无法获取该锁,将该线程包装为Node,加入到同步等待队列里。
- Semaphore用它来表示剩余的许可数量,当许可数量为0时,对未获取到许可但正在努力尝试获取许可的线程来说,会进入同步等待队列,阻塞,直到一些线程释放掉持有的许可(state+1),然后争用释放掉的许可。
- FutureTask用它来表示任务的状态(未开始、运行中、完成、取消)。
- ReentrantReadWriteLock在使用时,稍微有些不同,int型state用二进制表示是32位,前16位(高位)表示为读锁,后面的16位(低位)表示为写锁。
- CountDownLatch使用state表示计数次数,state大于0,表示需要加入到同步等待队列并阻塞,直到state等于0,才会逐一唤醒等待队列里的线程。
在AQS类中没有同步器方法的具体实现,所以我们来具体看其子类的源码的实现原理
独占式同步状态获取与释放
首先同步状态的获取需要通过调用同步器acquire(int arg)方法可以获取同步状态,该方法中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后序线程对进行中断操作时,线程不会从同步队列中移出
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个同步状态获取主要的流程步骤如下:
1)首先调用自定义同步器实现tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态。
2)如果获取失败则构造同步节点(独占式Node.EXCLUSIVE)并通过addWaiter(Node ndoe)方法将该节点加入到同步队列的尾部,同时调用enq(node)通过for(;;)循环保证安全设置尾节点。
CLH队列入列也很简单,和数据结构一样就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。
如下是addWaiter(Node node)方法的源码:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter(Node node)先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在上面代码中,两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。
在enq(Node node)方法中,AQS通过“死循环(自旋,死循环是一种表象的说法)”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。
过程图如下:
后面黄色的Node是我们新加的结点,获取同步失败后被加到队尾
3)节点进入同步队列之后“自旋”,即acquireQueued(final Node node, int arg)方法,在这个方法中,当前node死循环尝试获取锁状态,但是只有node的前驱结点是Head才能尝试获取同步状态,获取成功之后立即设置当前节点为Head,并成功返回。否则就会一直自旋。
源码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
文字总结:
1)同步器会维护一个双向FIFO队列,获取同步失败的线程将会被构造成Node加入队尾(并且做自旋检查:检查前驱结点是否是Head);
2)当前线程想要获得同步状态,前提是其前驱结点是头结点,并且获得了同步状态;
3)当Head调用release(int arg)释放锁的同时会唤醒后继节点(即当前节点),后继节点结束自旋
出队列过程:
CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。
同步器的release方法:释放锁的同时,唤醒后继节点(进而时后继节点重新获取同步状态)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
UnparkSuccessor(Node node)方法使用LookSupport(LockSupport.unpark)唤醒处于等待状态的线程。
过程图如下:
流程图总结:
共享式同步状态获取与释放
共享锁跟独占式锁最大的不同就是:某一时刻有多个线程同时获取到同步状态,获取判断是否获取同步状态成功的关键,获取到的同步状态要大于等于0。而其他步骤基本都是一致的,还是从源码开始分析起:带后缀Share都为共享式同步方法。
1)acquireShared(int arg)获取同步状态:如果获取失败则加入队尾,并且检查是否具备退出自旋的条件(前驱结点是头结点并且能成功获取同步状态)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
2)doAcquireShared(arg):获取失败的Node加入队列,如果当前节点的前驱结点是头结点的话,尝试获取同步状态,如果大于等于0则在for(;;)中退出(退出自旋)。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3)releaseShared(int arg):释放同步状态,通过loop+CAS方式释放多个线程的同步状态。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)