阻塞队列-BlockingQueue

2023-05-16

对于Queue而言,BlockingQueue是主要的线程安全的版本,具有阻塞功能,可以允许添加、删除元素被阻塞,直到成功为止,blockingqueue相对于Queue而言增加了两个方法put/take元素

BlockingQueue接口

属于并发容器中的接口,在java.util.concurrent包路径下

BlockingQueue不接受null元素,加入尝试通过add\put、offer等添加一个null元素时,某些实现上会抛出NullPointExeception问题。

BlockingQueue是可以接口指定容量,如果给定的数据超过给定容量,便无法添加元素,如果没有指定容量约束,最大是Interger.MAX_VALUE值。

BlockingQueue实现类主要用于生产者-消费者队列,另支持Collection接口。

BlockingQueue实现了线程安全,所有排队方法都可以使用内部锁或者其他并发控制形式来达到线程安全的目的

三个主要实现类介绍:

ArrayBlockingQueue:有界阻塞队列

LinkedBlockingQueue:无界阻塞队列

SynchronousQueue:同步队列

ArrayBlockingQueue:有界队列

ArrayBlockingQueue有界队列底层实现是Object数组,数组大小是固定的,假如数组一端为头,另一端为尾,那么头和尾构建一个FIFO队列

属性及默认值

    //存储的数据 存放在数组中
    final Object[] items;
    //读数据位置
    int takeIndex;
    //写入数据位置
    int putIndex;
    //数据数量
    int count;
    //队列同步相关属性
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

通过ArrayBlockingQueue数据结构可知:首先是有一个数组 T[],用来存储所有的元素,由于ArrayBlockingQueue最终设置为一个不可扩展大小的Queue,所以这里items就是初始化就固定大小的数组(final),另外有两个索引,头索引takeIndex,尾索引putIndex,一个队列的大小count,要阻塞的话就必须用到一个锁和两个条件(非空,非满),这三个条件都是不可变类型

因为只有一把锁,所以任意时刻对队列只能有一个线程,意味着索引和大小的操作都是线程安全的,因此takeindex等不需要原子操作和volatile语义了

构造函数:

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

   //通过初始容量和是否公平性抢锁标志来进行实例化
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    //通过初始容量capacity、公平性标志fair和集合c
    public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    //数据是不能为null
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

put操作

可阻塞的添加元素

public void put(E e) throws InterruptedException {
        //检测插入数据不能为null
        checkNotNull(e);
        //添加可中断的锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) //容量满了需要阻塞
                notFull.await();
            //当前集合未满,执行插入操作
            insert(e);
        } finally {
            //释放锁
            lock.unlock();
        }
    }

    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        //通知take操作已经有数据吗,如果有take方法阻塞,此时可被唤醒来执行take操作
        notEmpty.signal();
    }

  //循环数组的特殊标志处理 ,如果是到最大值则重定向到0号索引
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }

插入操作,在队列满的情况下会阻塞,直到有数据take出队列时才能结束阻塞,将当前数据插入队列

take方法

将数据从队列中移除

public E take() throws InterruptedException {
        //添加可中断的锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) //队列中没有数据时,需要阻塞,直到有数据put进入队列通知该操作可以继续执行
                notEmpty.await();
            //有数据时
            return extract();
        } finally {
            //释放锁
            lock.unlock();
        }
    }

    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        //发出通知 通知put方法,唤醒put操作
        notFull.signal();
        return x;
    }

ArrayBlockingQueue特点:

1、底层数据结构是数组,且数组大小一旦确定不可更改

2、不能存储null

3、阻塞功能是通过一个锁和两个隶属于该锁的Condition进行通信完成阻塞

Condition是在java 1.5中出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。

LinkedBlockingQueue:无界队列

1、底层数据结构

2、阻塞特征如何实现?有没有锁,有几把锁?

LinkedBlockingQueue有两个lock锁和两个Condition以及用于计数的AtomicInteger

底层数据结构是链表,都是采用头尾节点,每个节点执行下一个节点的结构

数据存储在Node结构中

引入两把锁,一个入队列锁,一个出队列的锁。满足同时有一个队列不满的Condition和一个队列不空的Condition。

为什么使用两把锁,一把锁是否可以?

一把锁完全可以的,一把锁意味着入队列和出队列同时只能有一个在进行,另一个必须等待释放锁,而从实际实现上来看,head和last是分离的,相互独立的,入队列实现是不会修改出队列的数据的,同理,出队列时也不会修改入队列,这两个操作实际是相互独立,这个锁相当于两个写入锁,入队列是一种写操作,操作head,出队列是一种写操作,操作的是tail,这两是无关的

SynchronousQueue:同步队列

SynchronousQueue:同步队列:每个插入操作必须等待另一个线程的移除操作,同样,任何一个移除操作都要等待另一个线程的插入操作,因此队列中其实没有任何一个数据,或者说容量为0,SynchronousQueue更像一个管道,不像容器,资源从一个方向快速的传递到另一个方向

队列对比

如果不需要阻塞队列,优先选择ConcurrentLinkedQueue;

如果需要阻塞队列,队列大小固定优先选择ArrayBlockingQueue,

队列大小不固定优先选择LinkedBlockingQueue;

如果需要对队列进行排序,选择PriorityBlockingQueue;

如果需要一个快速交换的队列,选择SynchronousQueue;

如果需要对队列中的元素进行延时操作,则选择DelayQueue

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

阻塞队列-BlockingQueue 的相关文章

  • 07_阻塞队列(BlockingQueue)

    目录 1 什么是BlockingQueue 2 认识BlockingQueue 3 代码演示 栈与队列概念 栈 Stack xff1a 先进后出 xff0c 后进先出 队列 xff1a 先进先出 1 什么是BlockingQueue 在多线
  • BlockingQueue深入分析

    1 BlockingQueue 定义的常用方法如下 抛出异常特殊值阻塞超时插入add e offer e put e offer e time unit 移除remove poll take poll time unit 检查element
  • BlockingQueue

    BlockingQueue 一 阻塞队列基本方法介绍 谈到线程池 xff0c 不得不谈到生产者 消费者模式 xff0c 谈到生产者 消费者 xff0c 就不得不谈到对应的数据结构 xff0c 谈到对应的数据结构不得不言 BlockingQu
  • 阻塞队列)详解

    一 前言 在新增的Concurrent包中 xff0c BlockingQueue很好的解决了多线程中 xff0c 如何高效安全 传输 数据的问题 通过这些高效并且线程安全的队列类 xff0c 为我们快速搭建高质量的多线程程序带来极大的便利
  • 阻塞队列-BlockingQueue

    对于Queue而言 xff0c BlockingQueue是主要的线程安全的版本 xff0c 具有阻塞功能 xff0c 可以允许添加 删除元素被阻塞 xff0c 直到成功为止 xff0c blockingqueue相对于Queue而言增加了
  • BlockingQueue深入分析

    1 BlockingQueue 定义的常用方法如下 抛出异常特殊值阻塞超时插入add e offer e put e offer e time unit 移除remove poll take poll time unit 检查element
  • Java 阻塞队列--BlockingQueue

    1 什么是阻塞队列 xff1f 阻塞队列 xff08 BlockingQueue xff09 是一个支持两个附加操作的队列 这两个附加的操作是 xff1a 在队列为空时 xff0c 获取元素的线程会等待队列变为非空 当队列满时 xff0c
  • ArrayBlockingQueue

    在java多线程操作中 BlockingQueue
  • Java基础学习之并发篇:手写阻塞队列ArrayBlockingQueue

    学习目标 我们都知道在并发编程中 阻塞队列在多线程中的场景特别有用 比如在生产和消费者模型中 生产者生产数据到队列 队列满时需要阻塞线程 停止生产 消费者消费队列 对队列为空时阻塞线程停止消费 在Java中有提供不同场景的阻塞队列 那么接下
  • Feign 使用 @SpringQueryMap 来解决多参数传递问题

    本文目录 1 Feign传递一个bean对象参数 2 Feign传递一个bean对象参数 多个基本类型参数 3 Feign传递多个基本类型参数 4 Feign传递多个bean对象参数 在实际项目开发过程中 我们使用 Feign 实现了服务与
  • 如何并行等待多个阻塞队列?

    我有两个独立的阻塞队列 客户端通常使用第一个或第二个阻塞队列来检索要处理的元素 在某些情况下 客户端对两个阻塞队列中的元素感兴趣 无论哪个队列首先提供数据 客户端如何并行等待两个队列 您可以尝试使用poll某种循环中的方法 仅在轮询另一个队
  • 有没有办法保存最大 1 MB 的“消息”集合并将结果写入 JSON/CSV 文件

    我有一个阻塞队列 它不断通过某些应用程序获取消息 现在在 asp net 应用程序中 我尝试使用该队列并将输出写入 CSV JSON 文件 在这里 我想保存最多 1MB 的消息 这些消息从阻塞队列接收 然后将其写出 现在再次保存 1MB 的
  • Java中可以使用Semaphore实现阻塞队列吗?

    我想知道是否可以使用Semaphore来实现阻塞队列 在下面的代码中 我使用一个信号量来保护关键部分 并使用另外两个信号量对象来跟踪空槽和已填充对象的数量 public class BlockingQueue private List qu
  • Mediacodec,解码来自服务器的字节数据包并将其渲染在表面上

    我对 MediaCode 有一些问题 我有 3 个组件 解码器 下载器和渲染器 又简单FragmentStreamVideo初始化 SurfaceView 和 Downloader 其他组件 例如渲染器和解码器 在 SurfaceView
  • 生产者消费者 - ExecutorService 和 ArrayBlockingQueue

    我想知道我对使用 ExecutorService 和 ArrayBlockingQueue 的生产者消费者设计的理解是否正确 我知道有不同的方法来实现这个设计 但我想 最终 这取决于问题本身 我必须面对的问题是 我有一个制作人 他从一个大文
  • 如何立即释放在BlockingQueue上等待的线程

    考虑一个BlockingQueue和一些等待的线程poll long TimeUnit 也可能在take 现在队列是空的 需要通知等待线程它们可以停止等待 预期的行为是null退回或申报的InterruptedException throw
  • 多生产者多消费者多线程Java

    我正在尝试生产者 消费者问题的多个生产者 多个消费者用例 我使用 BlockingQueue 在多个生产者 消费者之间共享公共队列 下面是我的代码 Producer import java util concurrent BlockingQ
  • 异步通知 BlockingQueue 有可用项目

    我需要一个Object当某些情况时得到异步通知BlockingQueue有一件物品要赠送 我在 Javadoc 和网络上搜索了一个预制的解决方案 然后我最终得到了我的一个 也许是幼稚的 解决方案 如下 interface QueueWait
  • 为什么 LogWriter 中的竞争条件会导致生产者阻塞? 【并发实践】

    首先 为了防止那些不喜欢读到我已读完的人将问题标记为重复生产者 消费者日志服务以不可靠的方式关闭 https stackoverflow com questions 31626772 producer consumer logging se
  • 如何阻塞直到BlockingQueue为空?

    我正在寻找一种方法来阻止直到BlockingQueue是空的 我知道 在多线程环境下 只要有生产者将物品放入BlockingQueue 可能会出现队列变空 几纳秒后又充满项目的情况 但是 如果只有one生产者 那么它可能希望在停止将项目放入

随机推荐

  • Pytorch维度操作-unsqueeze、squeeze、view与permute

    view 在pytorch中view函数的作用为重构张量的维度 相当于numpy中resize 的功能 a 61 1 2 3 b 61 2 3 4 c 61 3 5 5 d 61 4 5 6 e 61 np array a b c d e
  • 长假余额为零!我用Python做了个中秋国庆双节拼图游戏

    点击上方 菜鸟学Python xff0c 选择 星标 公众号 重磅干货 xff0c 第一时间送达 今年的国庆长假非常长 xff0c 不知不觉已经余额为零 xff01 朋友圈很多晒出游的照片 xff0c 聚会的照片 xff0c 吃吃喝喝真舒服
  • Redis系列学习1-Redis安装启动与基础概念

    Remote Dictionary Server Redis 是一个由 Salvatore Sanfilippo 写的 key value 存储系统 xff0c 是跨平台的非关系型数据库 Redis 是一个开源的使用 ANSI C 语言编写
  • Redis系列学习2-五大类型(String,List,Hash,Set,ZSet)及其常规操作

    Redis的基本操作 Redis默认是有16个数据库 xff0c 默认使用的是第0个数据库 xff0c 可以通过select 切换数据库 xff0c Redis的命令大小写不敏感的 切换数据库 切换数据库 格式 xff1a select i
  • Redis系列学习3-geospatial地理空间

    geospatial 地理空间 可以用来实现定位 附近的人 打车APP上距离计算 距离的实现主要基于经纬度 xff0c 城市的经纬度查询 xff1a http www jsons cn lngcode geoadd 添加地址位置 格式 xf
  • 遗传算法求解TSP旅行商问题

    旅行商问题 旅行商问题 traveling salesman problem TSP 可描述为 已知N个城市之间的相互距离 现有一个商人必须遍访这N个城市 并且每个城市只能访问一次 最后又必须返回出发城市 如何安排他对这些城市的访问次序 使
  • 剑指Offer-面试算法题

    1 二分查找 xff08 递归与非递归实现 xff09 基本算法 xff0c 掌握好循环条件 package com company Description 二分查找 xff08 递归与非递归实现 xff09 Created by Wanb
  • Python爬虫-抓取PC端网易云音乐评论(GUI界面)

    歌曲搜素 网易云音乐网址为 xff1a https music 163 com 思路是进入后输入一个歌曲名 xff0c 点击搜索按钮 xff0c 通过开发者调试工具捕获搜索请求 xff0c 捕获到的数据信息如下 xff1a 所有的歌曲相关信
  • Package cmake is not available, but is referred to by another package.

    inux环境下安装Cmake报错 xff1a Package cmake is not available but is referred to by another package This may mean that the packa
  • 完美数问题

    题目描述 对于一个十进制正整数 xff0c 如果z的每一位数字只可能是1 2 3中的其中一个 xff0c 则称 是完美数 如 123 1 3321都是完美数而5 1234则不是 牛牛想写一个函数f n xff0c 使得其返回最大的不大于n的
  • 围圈抽牌报数问题

    问题描述 米免参加公司司建 xff0c 100个同事围坐圈 xff0c 裁判开始顺时针从头发牌 xff0c 每发3张白牌就会发出1张黑 牌 xff0c 抽到黑牌的人出局 xff0c 每局第N个抽到黑牌的将获得奖励 问如果米免想获得奖品 xf
  • RTX30系列-Ubuntu系统配置与深度学习环境Pytorch配置

    本文完成RTX3090Windows 43 Ubuntu双系统配置 xff0c 并配置深度学习环境 硬件环境为RTX3090 43 Z590主板 xff0c 64GB RAM xff0c 2TB固态 xff0c 8TB存储 Ubuntu系统
  • 【rotors】多旋翼无人机仿真(一)——搭建rotors仿真环境

    rotors 多旋翼无人机仿真 xff08 一 xff09 搭建rotors仿真环境 rotors 多旋翼无人机仿真 xff08 二 xff09 设置飞行轨迹 rotors 多旋翼无人机仿真 xff08 三 xff09 SE3控制 roto
  • JVM内存管理

    JVM内存管理 Java 虚拟机在执行 Java 程序的过程中会把它管理的内存划分成若干个不同的数据区域 xff0c JDK 1 8 和之前的版本的数据区域有所差异 xff0c JDK1 6如下图所示 图片来源 xff1a JavaGuid
  • AQS、Semaphore、CountDownLatch与CyclicBarrier原理及使用方法

    AQS AQS 的全称为 AbstractQueuedSynchronizer xff0c 翻译过来的意思就是抽象队列同步器 这个类在 java util concurrent locks 包下面 xff0c AQS 就是一个抽象类 xff
  • 滑动窗口框架算法

    最长覆盖子串 xff0c 异位词 xff0c 最长无重复子串等等许多子串问题用常规暴力法费时费力 xff0c 一些大佬的解法虽然很强效率很高 xff0c 但是太难想到了 xff0c 这类问题用滑动窗口算法解决非常的快捷简便 滑动窗口算法思想
  • Python-深度学习常用脚本

    记录一些因为在网络训练 xff0c 测试过程中经常用到的一些脚本 1 视频按帧提取 可以从一段视频中截取不同帧的图片 xff0c 并保存至文件夹 需要自己更改视频路径和图片保存路径 import os import cv2 import s
  • Java面试基础(一)

    1 重载与重写 重载就是同样的一个方法能够根据输入数据的不同 xff0c 做出不同的处理 重写就是当子类继承自父类的相同方法 xff0c 输入数据一样 xff0c 但要做出有别于父类的响应时 xff0c 你就要覆盖父类方法不同类型的对象 x
  • 网络篇-传输控制协议TCP

    TCP协议 传输控制协议 xff08 TCP xff0c Transmission Control Protocol xff09 用一句话概括的话 xff0c 它是一种面向连接的 可靠的 基于字节流的传输层通信协议 TCP xff08 传输
  • 阻塞队列-BlockingQueue

    对于Queue而言 xff0c BlockingQueue是主要的线程安全的版本 xff0c 具有阻塞功能 xff0c 可以允许添加 删除元素被阻塞 xff0c 直到成功为止 xff0c blockingqueue相对于Queue而言增加了