AQS实现原理分析

2023-05-16

AQS

什么是AQS

AQS(AbstractQueuedSynchronizer)即抽象队列同步器,是一套可以实现同步锁机制的框架,是许多JUC内同步框架的基石。AQS通过一个FIFO的队列维护线程同步状态,实现类只需要继承该类,并重写指定方法既可以实现一套线程同步机制。

AQS原理

简单的说,AQS维护了一个volatile int state变量和CLH(三个人名字的缩写)双向队列,和一个ConditionObject(后续说明)组成

  • state:线程通过对state的修改来获取锁,如果修改失败将会被包装为节点挂载到队列中,等待持有锁的线程释放锁并唤醒队列中的节点
    /**
     * The synchronization state.
     */
    private volatile int state;

AQS中提供了获取和设置state的实现:

protected final int getState() {
        return state;
}

protected final void setState(int newState) {
        state = newState;
}
  • Node结构
static final class Node {
    volatile int waitStatus;	//节点状态
    volatile Node prev;			//双向链表前驱节点
    volatile Node next;			//后继节点
    volatile Thread thread;		//引用线程,头结点不包含线程
    Node nextWaiter;			//条件队列
}

waitStatus有五种取值:

  • CANCELLED = 1。节点引用线程由于等待超时或被打断时的状态。
  • SIGNAL = -1。后继节点线程需要被唤醒时的当前节点状态。当队列中加入的后继节点被挂起(block)时,其前驱节点会被设置为SIGNAL状态,表示该节点需要被唤醒。
  • CONDITION = -2。当节点线程进入condition队列时的状态。(见ConditionObject)
  • PROPAGATE = -3。仅在释放共享锁releaseShared时对头节点使用。(见共享锁分析)
  • 0。节点初始化时的状态。

获取锁失败的线程会被包装为节点,加入CLH双向队列中,结构如下:

image-20230320184937687

AQS提供的两种锁

在此之前,我们先举一个例子:

  • Mutex类继承AQS,实现其中方法,用来做获取锁和释放锁的操作
/**
 * @author 我见青山多妩媚
 * @date Create on 2023/3/13 21:56
 */
@Slf4j
public class Mutex extends AbstractQueuedSynchronizer {
	//独占锁加锁
    @Override
    protected boolean tryAcquire(int arg) {
        return compareAndSetState(0,1);
    }

    //独占锁解锁
    @Override
    protected boolean tryRelease(int arg) {
        return compareAndSetState(1,0);
    }
    
    //共享锁加锁
    @Override
    protected boolean tryRelease(int arg) {
        return compareAndSetState(1,0);
    }

    //共享锁解锁
    @Override
    protected int tryAcquireShared(int arg) {
        return super.tryAcquireShared(arg);
    }
       
}

其中compareAndSetState()方法:

image-20230320190108682

是一个CAS操作来确定更改值的

独占锁

我们Mutex类中写一个main方法,用来测试独占锁

public static void main(String[] args) {
        Mutex mutex = new Mutex();

        Thread t1 = new Thread(()->{
            log.debug("t1尝试获取锁");
            mutex.acquire(1);
            log.debug("t1获取锁成功");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.debug("t1准备释放锁");
            mutex.release(1);
            log.debug("t1锁已释放");
        },"t1");

        Thread t2 = new Thread(()->{
            log.debug("t2尝试获取锁");
            try {
                //确保t1第一个获取锁
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            mutex.acquire(1);
            log.debug("t2获取锁成功");
            mutex.release(1);
            log.debug("t2锁已释放");
        },"t2");

        t1.start();
        t2.start();
    }

运行结果:

17:52:42.623 [t1] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t1尝试获取锁
17:52:42.623 [t2] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t2尝试获取锁
17:52:42.626 [t1] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t1获取锁成功
17:52:45.632 [t1] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t1准备释放锁
17:52:45.632 [t1] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t1锁已释放
17:52:45.632 [t2] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t2获取锁成功
17:52:45.632 [t2] DEBUG com.JUCTest.LockTest.AQSTest.Mutex - t2锁已释放

首先可以肯定是的,t1一定先获取锁,并且在3s后t2才能获取锁

AQS中提供了获取独占锁的方法acquire()和释放锁的方法release()

加锁方法acquire()
public final void acquire(int arg) {
    	//tryAcquire() 实现类设置的值
    	//如果获取锁失败
        if (!tryAcquire(arg) &&
            //acquireQueued 尝试找一个节点加锁,否则挂起
            //addWaiter() 用来加入节点到队列中
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //如果阻塞线程被打断,抛出异常
            selfInterrupt();
}

整个方法的执行流程为:

image-20230320193858894

流程分析:

  1. tryAcquire() 自己实现的方法
  2. addWaiter():向队列中添加当前节点
private Node addWaiter(Node mode) {
    	//将当前线程封装为节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
    	//tail 为指向尾结点的一个节点
        Node pred = tail;
        if (pred != null) {
            //如果尾结点不为空(队列不为空),那么将node连接到队列中
            node.prev = pred;
            //compareAndSetTail 通过CAS尝试更改尾结点(tail)为当前节点node
            if (compareAndSetTail(pred, node)) {
                //成功后,彻底连接到队列中
                pred.next = node;
                return node;
            }
        }
    	//修改tail为node节点,enq为修改tail的唯一方法,内部使用 自旋+CAS实现
        enq(node);
        return node;
    }

addWaiter将线程包装为独占节点,尾插式加入到队列中,如队列为空,则会添加一个空的头节点,内容也为null

  1. acquireQueued():将队列中的每个node尝试去获取锁,挂起没有获得到锁的节点
final boolean acquireQueued(final Node node, int arg) {
    	//失败状态
        boolean failed = true;
        try {
            //打断状态
            boolean interrupted = false;
            //自旋
            for (;;) {
                //获取当前节点的前驱节点
                final Node p = node.predecessor();
                //如果前驱节点为头节点,并且尝试获取锁成功
                if (p == head && tryAcquire(arg)) {
                    //将当前节点设置为头节点
                    //该方法中,会将head指向node节点,并且将该节点的前驱节点和Thread参数置空
                    //目的是为了GC help GC
                    setHead(node);
                    //将当前节点的下一个节点引用设置为null,方便垃圾处理器回收,下文讲
                    p.next = null; // help GC
                    //防止进入finally
                    failed = false;
                    //返回被打断状态
                    return interrupted;
                }
                //将前序节点设置为挂起(park),挂起没有获得到锁的节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //尝试获取锁失败,取消获取锁
            if (failed)
                cancelAcquire(node);
        }
    }

意思是,当我们节点进来时,先看看当前队列有没有节点在处理,如果有就去排队,然后等待(被挂起,等快到自己的是被唤醒),如果你不是排队的第一节点,那么直接挂起。当被挂起时,他的waitStatus被设置为SIGNAL(-1),表示需要唤醒,随后通过park进入阻塞状态

另外,源码中有两段help GC,这里设置为空的原因是,头节点不参与排队,因为他已经获取到了同步状态,现在是需要进行业务逻辑操作的,而在业务逻辑操作完之后,该头结点肯定需要进行垃圾回收,防止空间浪费,这里就涉及到GC Root,如果还有对象引用的话,垃圾回收器是不会回收他的,所以要将他的属性置空,方便垃圾回收

当每次线程调用时都会先调用tryAcquire,失败后才会挂载到队列,因此acquire实现默认为非公平锁

释放锁的方法release()

释放锁的过程比较简单

public final boolean release(int arg) {
    	//通过自定义的方法,尝试释放锁
        if (tryRelease(arg)) {
            //释放成功后,指向头节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //如果当前头节点不为空,并且不为就绪状态,那么唤醒头节点
                //如果头结点的下个节点为空或头结点的waitStatus > 0,
                //那么将从尾结点往前遍历,找到最后一个waitStatus<0的节点,将其唤醒(unpark)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

从头结点开始唤醒后继节点,

共享锁

加锁方法acquireShared

加锁方法和独占锁类似,

public final void acquireShared(int arg) {
    	//如果小于0,那么和独占锁一样,开始对立面节点进行自旋+CAS , 
        if (tryAcquireShared(arg) < 0)
            //当前是写锁时,获取锁失败,往下走 通过CAS + 自旋获取锁,因为上一个锁总该要释放的
            doAcquireShared(arg);
}

区别是,独占锁返回的是bool,共享锁是int

  • doAcquireShared:将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回
private void doAcquireShared(int arg) {
    	//获取锁失败后,以Node.SHARED挂载到队列尾部
        final Node node = addWaiter(Node.SHARED);
    	//其余和独占锁差不多 都是CAS+自旋
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //拿前一个节点
                final Node p = node.predecessor();
                //如果前一个节点是头节点
                if (p == head) {
                    //那么试着获取共享锁,返回共享锁得到的值
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //将当前节点和r传入,r表示当前有多少已经获得锁的线程数   
                        //这个方法将单独摘出来讲
                        //唤醒下一个节点的线程  (共享锁的传播)
                        setHeadAndPropagate(node, r);
                        //和独占锁一样,帮助GC的
                        p.next = null; // help GC
                        //如果被挂起的节点没结束到这,进行打断
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //和独占锁一样,挂起没有获得到锁的节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {   //和独占锁一样
            if (failed)
                cancelAcquire(node);
        }
    }

因为这是个自旋,所以会传递唤醒后续的阻塞节点

我们在源码内说的,将setHeadAndPropagate单独摘出来说

 private void setHeadAndPropagate(Node node, int propagate) {
     	//获取头节点
        Node h = head; // Record old head for check below
     	//将node设置为头结点
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
     	//注释的意思是,当发生以下情况之一,将给头结点(node)的下一个节点发信号
     	//如果当前节点是共享锁节点,那么往下可以释放锁
     	//相当于检查是否有需要释放锁的节点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            //获取下一个节点,传播释放锁
            Node s = node.next;
            //如果为空,或者为共享模式,将释放锁
            if (s == null || s.isShared())
                //释放锁,因为是在自旋内,所以可以唤醒后续多个节点
                doReleaseShared();
        }
    }

源码内的注释写的很详细

因为在这里面,是获取到node.next的节点,所以实际是共享锁的传播解锁(如果条件合适)

解锁方法releaseShared

共享锁的解锁也和上面的类似,不过独占锁的解锁和加锁都是bool类型,共享锁只有加锁时int类型,更方便控制共享锁的数量吧

 public final boolean releaseShared(int arg) {
     	//如果获取到释放锁
        if (tryReleaseShared(arg)) {
            //那么进行释放
            doReleaseShared();
            return true;
        }
        return false;
}
  • doReleaseShared:释放锁
private void doReleaseShared() {
    	//喜闻乐见 自旋
        for (;;) {
            //获取投节点
            Node h = head;
            //不为空,并且头结点后还有节点
            if (h != null && h != tail) {
                //获取头结点状态
                int ws = h.waitStatus;
                //如果是被阻塞
                if (ws == Node.SIGNAL) {
                    //CAS尝试设置为0,恢复正常,如果不能设置,继续自旋(如果是已经阻塞的节点,不让他阻塞)
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //类似独占锁的解锁,可以参考,唤醒后继
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))   //如果已经是初始化状态,CAS更改
                    continue;                // loop on failed CAS
            }
            //如果头部改变了,那么继续循环,否则退出
            if (h == head)                   // loop if head changed
                break;
        }
    }

解锁和独占锁的解锁也类似,释放资源后唤醒后继,

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

AQS实现原理分析 的相关文章

随机推荐

  • IDEA环境配置及Spring入门

    IDEA环境配置及Spring入门 以HelloWorld为例的简单介绍与开发步骤 前言 xff1a 本文主要内容一 xff1a IDEA环境配置二 xff1a 用传统Java编程方式写HelloWorld例子传统java xff1a 不使
  • Java入门必备知识

    一 HelloWord案例编写 在IDEA下 xff1a 右键新建类起一个类名 class就是一个类 public static void main String args System out println 34 HelloWorld
  • c#程序编写规范

    C 代码开发规范 文件状态 xff1a 草稿 正式 修改 文件标识 xff1a 当前版本 xff1a 1 1 作 者 xff1a Empty 联系电话 xff1a 最后更新 xff1a 2014 04 07 版本记录 日期 版本号 作者 说
  • 数据结构——存储结构和逻辑结构

    数据结构 存储结构和逻辑结构 1 存储结构 xff1a 数据对象在计算机中的存储表示称为数据的存储结构 xff0c 也称为物理结构 把数据对象存储到 计算机时 xff0c 通常要求既要存储各数据元素的数据 xff0c 又要存储数据元素之间的
  • C++获取网卡信息-注册表名称

    废话不多说 xff0c 直接上代码 xff1a include lt WinSock2 h gt include lt Iphlpapi h gt include lt iostream gt using namespace std pra
  • 全局初始化变量/全局未初始化变量/全局静态变量/局部变量的存储位置,作用域,与生命周期

    比如如下程序代码片段 span class hljs keyword int span a 61 span class hljs number 0 span span class hljs keyword char span p1 span
  • 建议收藏!PyCharm快捷键大全

    PyCharm 是Python中使用的有力工具 xff0c 它提供的功能非常强大 xff0c 正确使用里面的实用技巧 xff0c 能带来事半功倍的效果 本文给大家整理汇总一下Pycharm 的常用快捷键 xff0c 希望能帮助到小伙伴节约省
  • java调用http传json数据或字符串

    package com emm util import java io ByteArrayOutputStream import java io File import java io FileOutputStream import jav
  • 字符串通配(正则表达式)

    题目描述 对于字符串A xff0c 其中绝对不含有字符 和 再给定字符串B xff0c 其中可以含有 或 xff0c 字符不能是B的首字符 xff0c 并且任意两个 字符不相邻 exp中的 代表任何一个字符 xff0c B中的 表示 的前一
  • Scikit-Learn简介

    写给自己的备忘 1 简介 对 Python 语言 有所了解的科研人员可能都知道 SciPy 一个开源的基于 Python 的科学计算工具包 基于 SciPy xff0c 目前开发者们针对不同的应用领域已经发展出了为数众多的分支版本 xff0
  • 生产者消费者模式实现

    生产者消费者模式实现 synchronized wait和notify方式 一 基本流程 生产者在缓冲区未满时生产数据 xff0c 消费者在缓冲区有数据时从缓冲区中取数据 如果缓冲区已经满了 xff0c 则生产者线程阻塞 xff1b 如果缓
  • Linux SSH 登录其他机器

    ssh命令用于远程登录上Linux主机 常用格式 xff1a ssh l login name p port user 64 hostname 更详细的可以用ssh h查看 举例 不指定用户 xff1a ssh 192 168 0 11 指
  • Python中获取文件路径

    os path abspath os span class token punctuation span path span class token punctuation span abspath span class token pun
  • Matlab——矩阵打印显示

    Matlab 矩阵打印显示 我们这里定义一个矩阵 xff08 1 xff09 以列的形式将矩阵中量显示出来方式为A 2 取第i行到第j行的第k列数据为A i j k 3 取第i行到第j行的第m列到第n列的数据为A i j m n 4 已行的
  • 数据库报错 [ERR] 1118 - Row size too large (> 8126). Changing some columns to TEXT or BLOB or using ROW_F

    就是说 xff0c 远程的数据库量太大了 xff0c 我本地的数据库配置不对 xff0c 装不下了 xff08 1 xff09 从网上看的要改引擎 xff1a 从mysql的配置文件下 xff0c 找到my ini文件 xff0c 然后需要
  • “端到端”是什么意思

    1 传统机器学习VS深度学习 相对于深度学习 xff0c 传统机器学习的流程往往由多个独立的模块组成 xff0c 比如在一个典型的自然语言处理 xff08 Natural Language Processing xff09 问题中 xff0
  • TRON对接

    参考 官方API文档 https developers tron network referenceC 43 43 https github com aUscCoder TronWallet TRON资源模型 参考 https tronpr
  • 双精度浮点数double

    double类型介绍 双精度浮点数 xff08 double xff09 是计算机使用的一种数据类型 比起单精度浮点数 xff0c 双精度浮点数 double 使用 64 位 xff08 8字节 xff09 来存储一个浮点数 它可以表示十进
  • 解析SpringBoot启动类——起步依赖、自动配置

    分析SpringBoot的启动类 提出问题 xff1a 为什么通过启动类的 SpringApplication run方法就可以启动一个项目 xff1f span class token annotation punctuation 64
  • AQS实现原理分析

    AQS 什么是AQS AQS xff08 AbstractQueuedSynchronizer xff09 即抽象队列同步器 xff0c 是一套可以实现同步锁机制的框架 xff0c 是许多JUC内同步框架的基石 AQS通过一个FIFO的队列