Java基础学习之并发篇:手写阻塞队列ArrayBlockingQueue

2023-11-16

学习目标

我们都知道在并发编程中,阻塞队列在多线程中的场景特别有用,比如在生产和消费者模型中,生产者生产数据到队列,队列满时需要阻塞线程,停止生产。消费者消费队列,对队列为空时阻塞线程停止消费,在Java中有提供不同场景的阻塞队列,那么接下来我们将学习

  1. ReentrantLock的Condition原理
  2. BlockingQueue的定义
  3. 了解ArrayBlockingQueue的实现
  4. 如何手写一个阻塞队列

Condition原理

在学习阻塞队列之前,我们先需要弄清楚ReentrantLockCondition机制。我们知道使用synchronized结合Object上的wait和notify方法可以实现线程间的等待通知机制。Condition同样可以实现这个功能,而且相比前者使用起来更清晰也更简单,扩展性更好。

  1. Condition能够支持不响应中断,而通过使用Object方式不支持
  2. Condition能够支持多个等待队列(new多个Condition对象),而Object方式只能支持一个
  3. Condition能够支持超时时间的设置,而Object不支持

具体看个小例子:

     	ReentrantLock lock = new ReentrantLock();
        Condition waitCond = lock.newCondition();
        Thread t1 = new Thread(() -> {
            System.out.println("before-wait...1");
            try {
                lock.lock();
                waitCond.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            System.out.println("after-wait...1");
        });

        Thread t2 = new Thread(() -> {
            System.out.println("before-wait...2");
            try {
                lock.lock();
                waitCond.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            System.out.println("after-wait...2");
        });
        t1.start();
        t2.start();
        Thread.sleep(1);
        lock.lock();
        waitCond.signalAll();
        System.out.println("signalAll");
        lock.unlock();

输出

before-wait...2
before-wait...1
signalAll
after-wait...2
after-wait...1

Process finished with exit code 0

可以看到,想要获得一个Condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用是AQS中的内部类ConditionObject

Condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。Condition内部也是使用相似的方式,内部维护了一个单向的 等待队列,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态。


BlockingQueue

Java中定义了一个BlockingQueue这样的接口,继承Queue[public interface Queue<E> extends Collection<E>],BlockingQueue提供四种不同的处理方法

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    
  	boolean remove(Object o);
  	
    boolean offer(E e);
    E poll();
    
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    void put(E e) throws InterruptedException;
    
    E take() throws InterruptedException;
抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除方法 remove(o) poll() take(o) poll(o, timeout, timeunit)
检查方法 element() peek()

在JDK7提供了7个阻塞队列分别是:

  1. ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  2. LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  3. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  4. DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  5. SynchronousQueue:一个不存储元素的阻塞队列。
  6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
  7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

其中ArrayBlockingQueue是一个由数组组成的有界阻塞队列,Array顾名思义内部队列定义的是一个数组存储数据,既然是数组必然需要定义长度。
具体类型结构:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * Serialization ID. This class relies on default serialization
     * even for the items array, which is default-serialized, even if
     * it is empty. Otherwise it could not be declared final, which is
     * necessary here.
     */
    private static final long serialVersionUID = -817911632652898426L;

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    }
  • items:一个Object的数组
  • tackIndex:出队列的下标
  • putIndex:入队列的下标
  • count:队列中元素的数量
  • lock:ReentrantLock类型的锁
  • notEmpty、notFull:Condition的等待

生产者和消费者共用lock锁,数组满时阻塞插入,即生产。数组空时阻塞获取,即消费。结合ReentranLock和Condition感觉好像懂了。如果是你,可以试试怎么实现。


实现ArrayBlockingQueue

先按上定义几个变量

 	final static int MAX = 10;
    LinkedList<Integer> queue = new LinkedList<>();

    ReentrantLock lock = new ReentrantLock();
    Condition full = lock.newCondition();
    Condition emtpy = lock.newCondition();

再看看生产者怎么构建,首先对于进来的每个线程上锁,判定当前数组长度大等于最大值,那就使当前线程进入等待队列,否则添加数据进数组。当然,对于长度为1的时候需要唤醒所有的消费线程。故可以实现如下:

	// Producer
    public void create() throws InterruptedException {
        lock.lock();
        if (queue.size() == MAX) {
            full.await();
            return;
        }
        int data = readData(); // 1s
        if(queue.size() == 1) {
            emtpy.signalAll();
        }
        queue.add(data);
        lock.unlock();
    }

消费者亦同理 :

    // Comsumer
    public void calculate() throws InterruptedException {

        lock.lock();
        if (queue.size() == 0) {
            emtpy.await();
            return;
        }
        int data = queue.remove();
        System.out.println("queue-size:" + queue.size());
        if(queue.size() == MAX - 1) {
            full.signalAll();
        }
        lock.unlock();
    }

那么我们手写的生产消费模型全部代码如下:

public class ProducerCustomerModel {
    final static int MAX = 10;
    LinkedList<Integer> queue = new LinkedList<>();
    ReentrantLock lock = new ReentrantLock();
    Condition full = lock.newCondition();
    Condition emtpy = lock.newCondition();
    
    int readData() throws InterruptedException {
        Thread.sleep((long)Math.random()*1000);

        return (int)Math.floor(Math.random()*1000);
    }
    // Producer
    public void readDb() throws InterruptedException {
        lock.lock();
        if (queue.size() == MAX) {
            full.await();
            return;
        }
        int data = readData(); // 1s
        if(queue.size() == 1) {
            emtpy.signalAll();
        }
        queue.add(data);
        lock.unlock();
    }

    // Comsumer
    public void calculate() throws InterruptedException {

        lock.lock();
        if (queue.size() == 0) {
            emtpy.await();
            return;
        }
        int data = queue.remove();
        System.out.println("get date"+data+" and queue-size:" + queue.size());
        if(queue.size() == MAX - 1) {
            full.signalAll();
        }
        lock.unlock();
    }


    public static void main(String[] argv) {
        ProducerCustomerModel p = new ProducerCustomerModel();
        for(int i = 0; i < 100; i++) {
            new Thread(() -> {
                while (true) {
                    try {
                        p.readDb();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        new Thread(() -> {
            while(true) {
                try {
                    p.calculate();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

发现输出有如下数据:

get date758 and queue-size:2
get date949 and queue-size:1
get date15 and queue-size:0
get date23 and queue-size:9
get date143 and queue-size:8
get date690 and queue-size:7
get date112 and queue-size:6
get date253 and queue-size:5
get date925 and queue-size:4
get date593 and queue-size:3
get date779 and queue-size:2
get date495 and queue-size:1
get date989 and queue-size:0

Process finished with exit code -1

其实回过头再在来看ArrayBlockingQueue其内部发现实现好像差不多就是这样的

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java基础学习之并发篇:手写阻塞队列ArrayBlockingQueue 的相关文章

  • 使用 Java 的 Apache Http 摘要身份验证

    我目前正在开发一个 Java 项目 但无法使 http 摘要身份验证正常工作 我尝试使用 Apache 网站 但没有帮助 我有一个需要 HTTP 摘要身份验证的网站 DefaultHttpClient httpclient new Defa
  • java中监视目录变化

    我正在使用 WatchService 来监视目录中的更改 特别是目录中新文件的创建 下面是我的代码 package watcher import java nio file import static java nio file Stand
  • TreeMap 删除所有大于某个键的键

    在项目中 我需要删除键值大于某个键的所有对象 键类型为Date 如果重要的话 据我所知TreeMapJava中实现的是红黑树 它是一种二叉搜索树 所以我应该得到O n 删除子树时 但除了制作尾部视图并一一删除之外 我找不到任何方法可以做到这
  • eclipse行号状态行贡献项是如何实现的?

    我需要更新状态行编辑器特定的信息 我已经有了自己的实现 但我想看看 eclipse 贡献项是如何实现的 它显示状态行中的行号 列位置 谁能指点一下 哪里可以找到源代码 提前致谢 亚历克斯 G 我一直在研究它 它非常复杂 我不确定我是否了解完
  • Java 的支持向量机?

    我想用Java编写一个 智能监视器 它可以随时发出警报detects即将到来的性能问题 我的 Java 应用程序正在以结构化格式将数据写入日志文件
  • Thymeleaf 3 Spring 5 映射加载字符串而不是 HTML

    我正在尝试将 Spring 5 和 Thymeleaf 3 一起配置 我正在 Eclipse 上工作 我使用 全新安装 构建并使用 springboot run 运行应用程序 我已经设置了一个控制器和几个模板 但 Thymeleaf 似乎找
  • 如何调试“com.android.okhttp”

    在android kitkat中 URLConnection的实现已经被OkHttp取代 如何调试呢 OkHttp 位于此目录中 external okhttp android main java com squareup okhttp 当
  • 如何在 Java 中向时间戳添加/减去时区偏移量?

    我正在使用 JDK 8 并且玩过ZonedDateTime and Timestamp很多 但我仍然无法解决我面临的问题 假设我得到了格式化的Timestamp在格林威治标准时间 UTC 我的服务器位于某处 假设它设置为Asia Calcu
  • 在 Java 中如何找出哪个对象打开了文件?

    我需要找出答案哪个对象在我的 Java 应用程序中打开了一个文件 这是为了调试 因此欢迎使用工具或实用程序 如果发现哪个对象太具体了 这class也会很有帮助 这可能很棘手 您可以从使用分析器开始 例如VisualVM http visua
  • 如何在 Spring 中使 @PropertyResource 优先于任何其他 application.properties ?

    我正在尝试在类路径之外添加外部配置属性资源 它应该覆盖任何现有的属性 但以下方法不起作用 SpringBootApplication PropertySource d app properties public class MyClass
  • Java Applet 中的 Apache FOP - 未找到数据的 ImagePreloader

    我正在研究成熟商业产品中的一个问题 简而言之 我们使用 Apache POI 库的一部分来读取 Word DOC 或 DOCX 文件 并将其转换为 XSL FO 以便我们可以进行标记替换 然后 我们使用嵌入到 Java 程序中的 FOP 将
  • 蓝牙发送和接收文本数据

    我是 Android 开发新手 我想制作一个使用蓝牙发送和接收文本的应用程序 我得到了有关发送文本的所有内容逻辑工作 但是当我尝试在手机中测试它时 我看不到界面 这是Main Activity Code import android sup
  • Jetty、websocket、java.lang.RuntimeException:无法加载平台配置器

    我尝试在 Endpoint 中获取 http 会话 我遵循了这个建议https stackoverflow com a 17994303 https stackoverflow com a 17994303 这就是我这样做的原因 publi
  • 如何将 HTML 链接放入电子邮件正文中?

    我有一个可以发送邮件的应用程序 用 Java 实现 我想在邮件中放置一个 HTML 链接 但该链接显示为普通字母 而不是 HTML 链接 我怎样才能将 HTML 链接放入字符串中 我需要特殊字符吗 太感谢了 Update 大家好你们好 感谢
  • 如何在JPanel中设置背景图片

    你好 我使用 JPanel 作为我的框架的容器 然后我真的想在我的面板中使用背景图片 我真的需要帮助 这是我到目前为止的代码 这是更新 请检查这里是我的代码 import java awt import javax swing import
  • 为什么\0在java中不同系统中打印不同的输出

    下面的代码在不同的系统中打印不同的输出 String s hello vsrd replace 0 System out println s 当我在我的系统中尝试时 Linux Ubuntu Netbeans 7 1 它打印 When I
  • Java RMI - 客户端超时

    我正在使用 Java RMI 构建分布式系统 它必须支持服务器丢失 如果我的客户端使用 RMI 连接到服务器 如果该服务器出现故障 例如电缆问题 我的客户端应该会收到异常 以便它可以连接到其他服务器 但是当服务器出现故障时 我的客户端什么也
  • Spring RESTful控制器方法改进建议

    我是 Spring REST 和 Hibernate 的新手 也就是说 我尝试组合一个企业级控制器方法 我计划将其用作未来开发的模式 您认为可以通过哪些方法来改进 我确信有很多 RequestMapping value user metho
  • java'assert'和'if(){}else exit;'之间的区别

    java和java有什么区别assert and if else exit 我可以用吗if else exit代替assert 也许有点谷歌 您应该记住的主要事情是 if else 语句应该用于程序流程控制 而assert 关键字应该仅用于
  • Java 和/C++ 在多线程方面的差异

    我读过一些提示 多线程实现很大程度上取决于您正在使用的目标操作系统 操作系统最终提供了多线程能力 比如Linux有POSIX标准实现 而windows32有另一种方式 但我想知道编程语言水平的主要不同 C似乎为同步提供了更多选择 例如互斥锁

随机推荐

  • JAVA 获取指定月份的每周的开始日期和结束日期

    1 第一种情况 从1号开始到月份最后一天结束 代码如下 private DateTimeFormatter dateTimeFormatter DateTimeFormatter ofPattern yyyy MM dd public Li
  • Arduino和Python卡尔曼滤波对四元数进行姿态测定

    在本文中 我将演示使用EKF 扩展卡尔曼滤波 对四元数确定姿态的实现 并说明将多个传感器数据融合在一起以使系统正常工作的必要性 将要使用的传感器是陀螺仪 加速度计和磁力计 Arduino用于从传感器读取数据 但是数据处理将在python中完
  • 移动端开发框架

    总体概述 现在比较流行的移动APP开发框架有以下六种 网页 混合 渐进 原生 桥接 自绘 前三种体验与Web的体验相似 后三种与原生APP的体验相似 这六种框架形式 都有自己适用的范围 无所谓好坏 适用就是好 网页应用适用于传统网站APP化
  • 手写vue(三)模板渲染解析

    一 目标 创建一个Vue实例时 我们可以传入el配置项 去指定一个DOM元素作为Vue容器 而这个Vue容器中 可以使用例如插值表达式等Vue框架提供的语法 并且能够渲染到浏览器页面上 而浏览器并不能解析这些Vue语法 因此 Vue框架是通
  • python: How to Create a Python Package

    StudentScoreInfo py 学生成绩类 date 2023 06 16 edit Geovin Du geovindu 涂聚文 ide PyCharm 2023 1 python 11 import datetime impor
  • GAN生成手写数字实例讲解Colab使用教程

    Colab 全称Colaboratory 是谷歌提供的一个在线工作平台 可以与谷歌云盘协作使用 我们可以在Colab平台上运行代码 而且大部分常用的包都已经安装好 不需要再进行安装 也不需要进行环境配置 非常方便快捷 对于初学者来说非常友好
  • 颠覆传统逻辑的C程序

    1 在main之前运行的C代码 before main c include
  • k8s 部署spring cloud项目

    微服务架构是一项在云中部署应用和服务的新技术 大部分围绕微服务的争论都集中在容器或其他技术是否能很好的实施微服务 而红帽说API应该是重点 微服务可以在 自己的程序 中运行 并通过 轻量级设备与HTTP型API进行沟通 关键在于该服务可以在
  • LouvainMethod分布式运行的升级之路

    1 背景介绍 Louvain是大规模图谱的谱聚类算法 引入模块度的概念分二阶段进行聚类 直到收敛为止 分布式的代码可以在如下网址进行下载 GitHub Sotera spark distributed louvain modularity
  • Windows下SpringBoot连接Redis的正确使用姿势

    1 安装Redis 1 1通过wsl安装redis 参考官方安装文档 需要在wsl2上安装redis服务 注意我们启动redis的方式 First way 采用官方文档的方式 sudo service redis server start
  • Python自学——The One Day(Python基础——介绍)

    文章目录 Python基础 介绍 前言 编译型语言和解释型语言 Python是什么 Python的优缺点是什么 优点 缺点 Python的运行过程 Python能干什么 怎样学好Python Python基础 介绍 前言 编译型语言和解释型
  • 2014年10月4399校招笔试--游戏岗

    今天参加了4399的笔试 总的来说题目不难 不过有些题没答上来 特别是选择题最后几个关于图像的题目22 25 真心不会
  • vivado中的常用AXI接口IP核

    AXI是xilinx中常用的数据接口 种类和引脚数量极多 1 AXI GPIO AXI GPIO为AXI接口提供了一个通用的输入 输出接口 可以配置成单通道和双通道 每个通道的位宽都可以单独设置 另外 通过打开或者关闭三通道缓冲器 AXI
  • 使用 ST-LINK 烧录程序到 STM32

    前言 之前博主在使用单片机时 烧录程序用的都是串口的方式 最近公司定制了一个工业版单片机目前只支持使用 ST LINK 烧录 因此博主收集了一些资料 并整理了烧录程序的流程用于分享和后期自己回顾 准备工作 准备烧录编程器 博主直接在网上买了
  • 图像仿射变换原理4:组合变换及对应变换矩阵

    老猿Python博文目录 https blog csdn net LaoYuanPython 仿射变换博文传送门 带星号的为付费专栏文章 图像仿射变换原理1 齐次坐标来龙去脉详解 图像仿射变换原理2 矩阵变换 线性变换和图像线性变换矩阵 图
  • Linux下 VS Code 安装与 C 编程环境配置!

    对于多文件的C项目 大部分人会选择使用 cmake 来管理编译过程 对于精力充沛的朋友来说 也可以学习一下使用这个强大的工具 但我觉得如果只想在VS Code里写几行代码应对当前需求 没必要再去学习一个完全陌生的东西 也没必要把配置过程复杂
  • 捕鱼游戏java源码

    package fishlord import java awt Color import java awt Font import java awt Graphics import java awt event MouseAdapter
  • eclipse 报错 java.lang.NullPointerException at org.eclipse.jface.resource.JFaceResources.getResources

    java lang NullPointerException at org eclipse jface resource JFaceResources getResources JFaceResources java 209 删除文件 wo
  • MySQL——流程控制(IF、CASE、LOOP、WHILE、REPEAT、LEAVE、ITERATE)

    解决复杂问题不可能通过一个 SQL 语句完成 我们需要执行多个 SQL 操作 流程控制语句的作用就是控制存储过程中 SQL 语句的执行顺序 是我们完成复杂操作必不可少的一部分 接下来让我们一起开始学习吧 流程控制 只要是执行的程序 流程就分
  • Java基础学习之并发篇:手写阻塞队列ArrayBlockingQueue

    学习目标 我们都知道在并发编程中 阻塞队列在多线程中的场景特别有用 比如在生产和消费者模型中 生产者生产数据到队列 队列满时需要阻塞线程 停止生产 消费者消费队列 对队列为空时阻塞线程停止消费 在Java中有提供不同场景的阻塞队列 那么接下