来谈谈 BlockingQueue 阻塞队列实现类 java.util.concurrent.LinkedBlockingQueue(JDK1.8 源码分析)

2023-11-18

LinkedBlockingQueue源码刨析



前言

public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable AbstractQueue:父类为Queue
BlockingQueue:阻塞队列接口
Serializable :序列化标记接口

提示:以下是本篇文章正文内容,下面案例可供参考

一、LinkedBlockingQueue源码部分

1.构造方法

//没有指定队列大小时默认队列为Integer.MAX_VALUE
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

//指定队列大小构造
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException(); //队列大小不能小于0
    this.capacity = capacity; //设置队列容量
    last = head = new Node<E>(null); //设置一个空的头节点,便于操作
}

//此种方法创建队列时队列容量为Integer.MAX_VALUE
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE); //初始化队列
    final ReentrantLock putLock = this.putLock; //获取写锁
    putLock.lock(); //加锁
    try {
        int n = 0;
        for (E e : c) { //遍历要加入队列集合
            if (e == null) //加入队列中的元素不能为空,为空时抛出异常
                throw new NullPointerException();
            if (n == capacity) //超出队列的最大容量时抛出非法状态异常
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e)); //入队操作
            ++n; //计数器加加
        }
        count.set(n); //这是一个原子操作
    } finally {
        putLock.unlock(); //释放锁
    }
}

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node; //队列尾部元素指向新加入的元素,last指向队尾
}
  1. 可以看出空构造方法默认队列大小为 Integer.MAX_VALUE,加入队列元素过于庞大时可能消耗大小的内存空间。
  2. 从 LinkedBlockingQueue(Collection<? extends E> c)构造方法发中可以看出与ArrayBlockingQueue 一样队列中不能加入空元素否则抛出异常。
  3. 我们可以传入参数指定链表队列大小

2.成员变量

//队列最多可以有多少个元素(没有设置时默认为Integer.MAX_VALUE)
private final int capacity;

//写锁
private final ReentrantLock putLock = new ReentrantLock();

//读锁
private final ReentrantLock takeLock = new ReentrantLock();

//当前队列中元素的个数
private final AtomicInteger count = new AtomicInteger();

//最开始last=head=头节点 (该节点不存放任何元素,仅仅是便于链表操作)

//队列头部指针
transient Node<E> head;

//队列尾部指针
private transient Node<E> last;

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

  1. 从成员变量 putLock 和 takeLock 可以看出 LinkedBlockingQueue 读写锁是分离的,这里与ArrayBlockingQueue读写共用一把全局锁不同,因此 LinkedBlockingQueue 可以同时进行读写操作,也可以用于生产者消费则模式(ArrayBlockingQueue则不能)。
  2. 成员变量 count 是一个原子变量用于计算队列中元素的个数,为什么要设置成原子变量,因为我们使用了读写锁分离,因此可能同一时间有两个线程操作该变量,因此要保证其原子性。
  3. 与 ArrayBlockingQueue 相同 LinkedBlockingQueue 有两个 Condition 但是该等待队列创建的锁是不同的(分别是 putLock 和 takeLock)。
  4. head 和 last 可以看出队列底层使用了链表。

3.主要方法

1.入队操作

offer方法

//向队列首部加入一个元素(当队列满时直接返回false不会阻塞)
public boolean offer(E e) {
    if (e == null) throw new NullPointerException(); //加入元素值为null时抛出空指针异常
    final AtomicInteger count = this.count; //获取队列大小
    if (count.get() == capacity) //队列满时
        return false; //直接返回false
    int c = -1;
    Node<E> node = new Node<E>(e); //创建新节点
    final ReentrantLock putLock = this.putLock; //获取写入锁
    putLock.lock(); //加锁
    try {
        if (count.get() < capacity) { //队列当前大小小于队列最大容量
            enqueue(node); //新节点加入到队列尾部
            c = count.getAndIncrement(); //队列大小加一并旧值保存到临时变量c中
            if (c + 1 < capacity) //?为什么要加一getAndIncrement返回的是旧值
                notFull.signal(); //唤醒准备入队中线程的一个
        }
    } finally {
        putLock.unlock(); //释放锁
    }
    if (c == 0) //说明之前队列为空(之前队列为空时,take才肯能阻塞,这提高了性能)
        signalNotEmpty(); //唤醒队列中准备出队中线程的一个
    return c >= 0; //队列加入成功返回true(队列满时c=-1)
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();//加入元素值为null时抛出空指针异常
    long nanos = unit.toNanos(timeout); //将指定的超时时间转换成纳秒
    int c = -1;
    final ReentrantLock putLock = this.putLock; //获取写入锁
    final AtomicInteger count = this.count; //获取队列当前大小变量
    putLock.lockInterruptibly(); //加锁操作
    try {
        while (count.get() == capacity) { //队列满时会进行循环阻塞
            if (nanos <= 0) //设置时间<=0时直接返回false
                return false;
            nanos = notFull.awaitNanos(nanos); //等待指定时间如果队列仍然满返回false
        }
        enqueue(new Node<E>(e)); //向队列尾部加入一个新的元素
        c = count.getAndIncrement(); //当前队列大小加一并且保存在临时变量c中
        if (c + 1 < capacity) //如果队列中还没满(还可以放入一个元素) 
            notFull.signal(); //唤醒准备入队的等待的第一个线程
    } finally {
        putLock.unlock(); //释放锁
    }
    if (c == 0) //在该方法执行enqueue(new Node<E>(e));方法后一个线程进行了出队操作,              //使得队列为空了
        signalNotEmpty(); 
    return true; //入队成功直接放回true
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock; //获取读锁
    takeLock.lock(); //加锁
    try {
        notEmpty.signal(); //唤醒准备进行出队操作的第一个线程
    } finally {
        takeLock.unlock(); //释放锁
    }
}

put方法

put方法
//此方法在队列尾部加入一个元素,队列满时可被阻塞,该方法也可以在加锁期间被中断
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException(); //加入队列的元素不能为null
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e); //创建一个新的节点
    final ReentrantLock putLock = this.putLock; //获取写锁
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); //加锁(该加锁方式中断大于释放锁)
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        while (count.get() == capacity) { //队列已满
            notFull.await(); //进入阻塞状态
        }
        enqueue(node); //入队操作
        c = count.getAndIncrement(); //队列中的元素加一个,并返回旧值到c变量中去
        if (c + 1 < capacity) //当前队列元素仍然未满
            notFull.signal(); //唤醒其他等待入队的线程中的一个
    } finally {
        putLock.unlock(); //释放锁
    }
    if (c == 0) //c=0说明之前队列是空的,此时才可能有出队线程被阻塞
        signalNotEmpty(); //唤醒出队线程中的一个
}

2.出队操作

poll方法

//取出队首的元素
public E poll() {
    final AtomicInteger count = this.count; //获取队列中元素个数变量
    if (count.get() == 0) //队列中不含元素时直接返回null
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock; //获取读锁
    takeLock.lock(); //加锁
    try {
        if (count.get() > 0) { //队列中含有元素时
            x = dequeue(); //取出队首元素
            c = count.getAndDecrement();//队列中元素减一,并将旧值存在 c中
            if (c > 1) //说明此时队列中还含有元素
                notEmpty.signal(); //唤醒哪些阻塞在出队操作的线程中的一个
        }
    } finally {
        takeLock.unlock(); //释放锁
    }
    if (c == capacity)//说明出队操作前队列时满的,此时才有可能有准备入队的线程在阻塞
        signalNotFull();//唤醒哪些阻塞在入队操作的线程中的一个
//返回队首值(可能为null)
//因为在执行 count.get() == 0 之后其他执行出队操作的线程先于他出队,他在出队时   可能队列中不含元素。
    return x; 
}

//取出队首元素
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread(); //断言持有读锁
    // assert head.item == null; 
    Node<E> h = head; //头节点该节点为辅组节点
    Node<E> first = h.next; //队列真正的头部
    h.next = h;  //把辅组节点指向自身
    head = first; //队列头节点变为新的辅组节点
    E x = first.item; //保存之前队列头节点的值
    first.item = null; //辅组节点值置为null
    return x; //返回之前队列头节点的值
}
//此方法与之前出队操作的offer大同小异(这里不做过多解释)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

take方法

take方法
//取出队首元素,队列空时线程被阻塞,该方法可被线程中断
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count; //获取队列元素大小变量
    final ReentrantLock takeLock = this.takeLock; //获取读锁
    takeLock.lockInterruptibly(); //加锁操作
    try {
        while (count.get() == 0) { //当队列中元素为空时
            notEmpty.await(); //线程进入阻塞状态,并且释放掉所持有的锁
        }
        x = dequeue(); //出队操作
        c = count.getAndDecrement(); //队列中元素个数减一,并且将原来的旧值保存起来
        if (c > 1) //原来队列中元素是大于一的(说明队列中还有元素)
            notEmpty.signal();//唤醒哪些阻塞在等待出队操作的线程中的一个
    } finally {
        takeLock.unlock(); //释放锁
    }
    if (c == capacity) //原来队列是满的,此情况下才有可能有线程阻塞在入队操作
        signalNotFull();//唤醒哪些阻塞在等待入队操作的线程中的一个

    return x;
}

peek方法

//获取队列首部元素值,并不是取出队列首部节点
public E peek() {
    if (count.get() == 0) //队列为空时
        return null; //直接放回null
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock(); //加上读锁
    try {
        Node<E> first = head.next; //队首节点为头节点的下一个节点
        if (first == null) //这里判断为null是因为可能在count.get() != 0之后有新的线程进行了出队操作,之后队列为空
            return null;
        else //这里的first不可能为空了,因为此线程已经加锁了,其他线程无法出队
            return first.item; //返回队首元素的值
    } finally {
        takeLock.unlock(); //释放锁
    }
}

总结LinkedBlockingQueue阻塞队列的入队出队操作

入队操作:

  1. offer(E e)方法:该方法向队列尾部加入一个元素,队列满时直接返回false
  2. offer(E e, long timeout, TimeUnit unit)方法:该方法向队列尾部加入一个元素,队列满时不会直接返回,而是阻塞一个timeout > 0的时间,阻塞时所持有的锁被释放,直到其他线程唤醒(如果队列不为空了,进行入队操作,如果队列仍然为空,判断是否超时,超时直接返回false,不超时继续阻塞),或者超时(直接返回false),或者被中断。当然如果最开始timeout设置的值是<=0且队列也为满是直接返回false
  3. (E e)方法:向队列尾部加入一个元素,如果队列已经满了直接阻塞当前线程(阻塞线程时会释放掉所持有的锁),直到线程被中断或者被其他线程唤醒

出队操作:

  1. poll()方法:返回队首元素,如果队列为空时直接返回null
  2. take()方法:返回队首元素,与put(E e)方法类似
  3. peek()方法:获取队首元素的值,并不进行出队操作(队列并没有改变),如果队列为空时直接返回null

总结

从源代码可以看出LinkedBlockingQueue是一个以head辅组节点为头节点的链表,指针first指向队列首部,last指向队列的尾部,当first = last = head时或count = 0 时队列为空,注意count是一个原子变量,我们对他的加加减减操作都是原子的,之所以设置成原子变量是因为,我们的入队和出队操作可以同时进行,因为入队和出队操作分别持有一把锁,为putLock和takeLock,而count是一个共享于入队和出队操作的变量,把他设置成原子变量可以避免因并发而产生的错误(如:count = 9 ,入队操作读取的count=9,出队操作读取的count=9,假设先入队count = 9 + 1 ,再出队 count = 9 – 1 ,最后count = 8 导致数据不一致性)。而我们的ArrayBlockingQueue中的count是一个普通的int变量,因为他的入队和出队操作都是持有一把全局的锁lock,所以入队和出队不能同时进行,尽管count时共享,但在一个时刻只有一个操作再操作他,不会有引发上面提到的数据不一致的问题。

ArrayBlockingQueue 和 LinkedBlockingQueue区别:

  1. 底层数据结构不同 前者使用数组,后者使用链表。数组查询效率可以达到O(1),链表O(N),数组插入|删除时可能需要移动大量元素效率为O(N),链表插入|删除操作为O(1)但是链表插入操作仍需先查询O(N),数组内存空间是连续的空间,链表不是。
  2. 容量指定不同,前者在构造时必须指定容量,并且指定多大容量直接分配一块连续的内存空间,而后者只需要指定最大容量,并且在使用时才会分配空间。
  3. 使用的锁数量不同,前者读写操作共享一把相同的锁,因此在一个时刻只能进行读或者写一个操作,后者使得读写锁分离即使用了两把锁,因此可以同时进行读写操作,因此后者可以用于生产者消费者模式而前者则不行。
  4. 统计队列当前个数变量不同,前者使用的普通的int count,因为全局共享一把锁的缘故次方式是安全的,而后者使用了AtomicInteger 原子变量,因为同时可以进行读写操作因此设计原子变量是必要的。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

来谈谈 BlockingQueue 阻塞队列实现类 java.util.concurrent.LinkedBlockingQueue(JDK1.8 源码分析) 的相关文章

随机推荐

  • CNN,RNN,LSTM区别

    一 CNN 卷积神经网络 在机器学习中 卷积神经网络是一种深度前馈人工神经网络 已成功地应用于图像识别 1 卷积神经网络 是一种前馈神经网络 人工神经元可以响应周围单元 可以进行大型图像处理 卷积神经网络包括卷积层和池化层 卷积神经网络包括
  • 盒子模型大详解

    文档流 网页是一个多层结构 设置样式也是一层一层设置的 最终我们看到的是最上面的那一层 文档流就是网页最底部 我们创建的元素默认都是在文档流中创建的 元素分为两种状态 在文档流 脱离文档流 元素在文档流的特点 块元素 1 独占一行 2 宽是
  • 手机iCloud储存空间已满,怎么解决?

    最近手机总是弹出iCloud储存空间已满 升级的话得花钱 以后再说的话 总感觉有点 不安 担心自己的照片啥的会存不了 所以特意查找了这种方法 如果有出现这种情况的朋友 可以试试 1 找出iCloud空间被哪些档案塞满 iiPhone或iPa
  • Linux之mmv命令批量替换文件名(超详细-python结合mmv)

    文章目录 一 前言 二 各系统安装mmv方法 2 1 CentOS 2 2 Ubuntu And Debain 2 3 MacOS 三 使用方法 3 1 常规使用 3 1 1 常规使用示例 3 2 携带参数使用 3 2 1 携带参数使用示例
  • vue3.x之isRef toRefs isRef readonly 公共数据配置 axios配置 路由配置

    isRef toRefs toRef 参数 源对象 源对象属性 可以用来为源响应式对象上的某个 property 新创建一个 ref 然后 ref 可以被传递 它会保持对其源 property 的响应式连接 也就是说源响应式对象 toRef
  • 3427: Dark roads

    http cs scu edu cn soj problem action id 3427 Description Economic times these days are tough even in Byteland To reduce
  • 向量二范数的求导问题

    现有目标函数 f x 1 2
  • ant design pro 可编辑表格

    import React useRef from react import PageHeaderWrapper from ant design pro layout import ProColumns ActionType TableDro
  • python elif 用法,在Python列表推导中对if / elif语句使用'for'循环

    I am trying to translate this for loop into a list comprehension a 1 2 3 4 5 6 7 8 9 result for i in a if i lt 3 result
  • 数据结构--单链表的插入&删除

    数据结构 单链表的插入 删除 目标 单链表的插入 位插 前插 后插 单链表的删除 单链表的插入 按为序插入 带头结点 ListInsert L i e 插入操作 在表L中的第i个位置上插入指定元素e 思路 找到第i 1个结点 将新结点插入其
  • ElasticSearch学习:ElasticSearch概述

    elasticsearch用于文本搜索的函数库Lucene ElasticSearch是基于此做的封装和增强 ElasticSearch 简称es es是一个开源的高拓展的分布式全文检索引擎 它可以近乎实施的存储 检索数据 本身扩展性很好
  • python代码行末的 \ 符号

    mlm l loss mlm Y hat reshape 1 vocab size mlm Y reshape 1 mlm weights X reshape 1 1 在代码中 是Python中的行继续符号 它用于表示代码行在物理上被分成多
  • 如何开始使用 GitLab 的 CLI 从终端管理 DevOps

    GitLab是面向现代软件交付团队的领先源代码控制和 CI CD 解决方案之一 它提供了一整套用于规划 构建和交付软件项目的功能 GitLab 通常使用其 Web UI 或 API 进行交互 这些选项对于以终端为中心的开发人员来说都不是特别
  • 强化学习笔记(1)-同策回合更新算法

    在我上一篇博客文章https blog csdn net gzroy article details 119509552中对21点的策略进行了研究 采用蒙特卡洛的方式来进行多次的模拟 通过对比不同策略的收益来找到最佳的策略 主要是通过概率的
  • layui的分页实例详解

    原 layui的分页实例详解 2018年09月20日 17 43 07 李什么泽 阅读数 11571 更多 分类专栏 layui分页 版权声明 本文为博主原创文章 遵循 CC 4 0 BY SA 版权协议 转载请附上原文出处链接和本声明 本
  • ndk错误总结

    1 ndk Unresolved inclusion
  • mongo在linux下的安装(实践记录)

    1 下载安装包 wget http fastdl mongodb org linux mongodb linux i686 1 8 2 tgz 下载完成后解压缩压缩包 tar zxf mongodb linux i686 1 8 2 tgz
  • IDEA Cannot resolve plugin org.apache.maven.pluginsmaven-jar-plugin2.4

    起因 最近在弄Maven项目 在使用IDEA创建Maven项目得时候一直报错 搞的我很头疼 网上搜索答案 都是修改Setting xml 配置本地仓库 然后我测试了好多次都不管用 但是根据错误信息他的确是Maven仓库配置得问题和IDEA
  • 初识GoogleTest

    1 初识GoogleTest 首先要了解googletest是做什么的 主要是单元测试框架 第二是googletest有什么优势 测试过程独立可以重复 测试组织与代码结构保持比较好的一致性 支持跨平台 失败后能够提供完整错误信息 同时支持失
  • 来谈谈 BlockingQueue 阻塞队列实现类 java.util.concurrent.LinkedBlockingQueue(JDK1.8 源码分析)

    LinkedBlockingQueue源码刨析 文章目录 LinkedBlockingQueue源码刨析 前言 一 LinkedBlockingQueue源码部分 1 构造方法 2 成员变量 3 主要方法 1 入队操作 offer方法 pu