BlockingQueue深入分析

2023-05-16

1.BlockingQueue定义的常用方法如下
 抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
检查element()peek()不可用不可用

1)add(anObject):anObject加到BlockingQueue,即如果BlockingQueue可以容纳,则返回true,否则招聘异常

2)offer(anObject):表示如果可能的话,anObject加到BlockingQueue,即如果BlockingQueue可以容纳,则返回true,否则返回false.

3)put(anObject):anObject加到BlockingQueue,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

5)take():取走BlockingQueue里排在首位的对象,BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

其中:BlockingQueue 不接受null 元素。试图addput 或offer 一个null 元素时,某些实现会抛出NullPointerExceptionnull 被用作指示poll 操作失败的警戒值。 

2、BlockingQueue的几个注意点

【1】BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个remainingCapacity,超出此容量,便无法无阻塞地put 附加元素。没有任何内部容量约束的BlockingQueue 总是报告Integer.MAX_VALUE 的剩余容量。

【2】BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持Collection 接口。因此,举例来说,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

【3】BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAll 和removeAll没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

【4】BlockingQueue 实质上不支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的end-of-stream 或poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。
3、简要概述BlockingQueue常用的四个实现类

1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.

2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的

3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.

4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

    
其中LinkedBlockingQueueArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.  
下面主要看一下 ArrayBlockingQueue的源码:
public boolean offer(E e) {    
        if (e == null) throw new NullPointerException();    
        final ReentrantLock lock = this.lock;//每个对象对应一个显示的锁    
        lock.lock();//请求锁直到获得锁(不可以被interrupte)    
        try {    
            if (count == items.length)//如果队列已经满了    
                return false;    
            else {    
                insert(e);    
                return true;    
            }    
        } finally {    
            lock.unlock();//    
        }    
}    
看insert方法:    
private void insert(E x) {    
        items[putIndex] = x;    
        //增加全局index的值。    
        /*   
        Inc方法体内部:   
        final int inc(int i) {   
        return (++i == items.length)? 0 : i;   
            }   
        这里可以看出ArrayBlockingQueue采用从前到后向内部数组插入的方式插入新元素的。如果插完了,putIndex可能重新变为0(在已经执行了移除操作的前提下,否则在之前的判断中队列为满)   
        */   
        putIndex = inc(putIndex);     
        ++count;    
        notEmpty.signal();//wake up one waiting thread    
}    

public void put(E e) throws InterruptedException {    
        if (e == null) throw new NullPointerException();    
        final E[] items = this.items;    
        final ReentrantLock lock = this.lock;    
        lock.lockInterruptibly();//请求锁直到得到锁或者变为interrupted    
        try {    
            try {    
                while (count == items.length)//如果满了,当前线程进入noFull对应的等waiting状态    
                    notFull.await();    
            } catch (InterruptedException ie) {    
                notFull.signal(); // propagate to non-interrupted thread    
                throw ie;    
            }    
            insert(e);    
        } finally {    
            lock.unlock();    
        }    
}    

public boolean offer(E e, long timeout, TimeUnit unit)    
        throws InterruptedException {    
   
        if (e == null) throw new NullPointerException();    
    long nanos = unit.toNanos(timeout);    
        final ReentrantLock lock = this.lock;    
        lock.lockInterruptibly();    
        try {    
            for (;;) {    
                if (count != items.length) {    
                    insert(e);    
                    return true;    
                }    
                if (nanos <= 0)    
                    return false;    
                try {    
                //如果没有被 signal/interruptes,需要等待nanos时间才返回    
                    nanos = notFull.awaitNanos(nanos);    
                } catch (InterruptedException ie) {    
                    notFull.signal(); // propagate to non-interrupted thread    
                    throw ie;    
                }    
            }    
        } finally {    
            lock.unlock();    
        }    
    }    

public boolean add(E e) {    
    return super.add(e);    
}    
父类:    
public boolean add(E e) {    
        if (offer(e))    
            return true;    
        else   
            throw new IllegalStateException("Queue full");    
    }  
该类中有几个实例变量: takeIndex/putIndex/count
用三个数字来维护这个队列中的数据变更:    
    /** items index for next take, poll or remove */   
    private int takeIndex;    
    /** items index for next put, offer, or add. */   
    private int putIndex;    
    /** Number of items in the queue */   
    private int count;    
转自:http://blog.csdn.net/vernonzheng/article/details/8247564
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

BlockingQueue深入分析 的相关文章

  • 07_阻塞队列(BlockingQueue)

    目录 1 什么是BlockingQueue 2 认识BlockingQueue 3 代码演示 栈与队列概念 栈 Stack xff1a 先进后出 xff0c 后进先出 队列 xff1a 先进先出 1 什么是BlockingQueue 在多线
  • 手机开发之三:CoreApp的深入分析

    四 xff0e CoreApp的深入分析 目前参考代码里面的CoreApp所完成的工作比较多且杂 xff0c 主要说来有如下几件事 a 系统组件初始化 xff1b b 开机Logo的显示 xff1b c Sim卡检测和Pin码校验 xff1
  • BlockingQueue深入分析

    1 BlockingQueue 定义的常用方法如下 抛出异常特殊值阻塞超时插入add e offer e put e offer e time unit 移除remove poll take poll time unit 检查element
  • BlockingQueue

    BlockingQueue 一 阻塞队列基本方法介绍 谈到线程池 xff0c 不得不谈到生产者 消费者模式 xff0c 谈到生产者 消费者 xff0c 就不得不谈到对应的数据结构 xff0c 谈到对应的数据结构不得不言 BlockingQu
  • 【Python】中文乱码问题与解决方案 深入分析

    一直以来 xff0c python中的中文编码就是一个极为头大的问题 xff0c 经常抛出编码转换的异常 xff0c python中的str和unicode到底是一个什么东西呢 xff1f 在本文中 xff0c 以 39 哈 39 来解释作
  • 深入分析OpenFlow协议

    文章目录 OpenFlow是什么 xff1f xff1f xff1f OpenFlow的起源与发展OpenFlow工作原理OpenFlow组件OpenFlow控制器1 NOX POX2 ONOS3 OpenDaylight OpenFlow
  • 阻塞队列-BlockingQueue

    对于Queue而言 xff0c BlockingQueue是主要的线程安全的版本 xff0c 具有阻塞功能 xff0c 可以允许添加 删除元素被阻塞 xff0c 直到成功为止 xff0c blockingqueue相对于Queue而言增加了
  • Java 阻塞队列--BlockingQueue

    1 什么是阻塞队列 xff1f 阻塞队列 xff08 BlockingQueue xff09 是一个支持两个附加操作的队列 这两个附加的操作是 xff1a 在队列为空时 xff0c 获取元素的线程会等待队列变为非空 当队列满时 xff0c
  • 【散文诗】C语言的本质(基于ARM深入分析C程序)

    文章目录 1 ARM架构ARM通用寄存器及其别名基本汇编指令LDR xff1a STR xff1a ADD xff1a SUB xff1a BL xff1a PUSH xff1a POP xff1a MOV xff1a 2 局部变量的分配与
  • ArrayBlockingQueue

    在java多线程操作中 BlockingQueue
  • Feign 使用 @SpringQueryMap 来解决多参数传递问题

    本文目录 1 Feign传递一个bean对象参数 2 Feign传递一个bean对象参数 多个基本类型参数 3 Feign传递多个基本类型参数 4 Feign传递多个bean对象参数 在实际项目开发过程中 我们使用 Feign 实现了服务与
  • 如何并行等待多个阻塞队列?

    我有两个独立的阻塞队列 客户端通常使用第一个或第二个阻塞队列来检索要处理的元素 在某些情况下 客户端对两个阻塞队列中的元素感兴趣 无论哪个队列首先提供数据 客户端如何并行等待两个队列 您可以尝试使用poll某种循环中的方法 仅在轮询另一个队
  • 有没有办法保存最大 1 MB 的“消息”集合并将结果写入 JSON/CSV 文件

    我有一个阻塞队列 它不断通过某些应用程序获取消息 现在在 asp net 应用程序中 我尝试使用该队列并将输出写入 CSV JSON 文件 在这里 我想保存最多 1MB 的消息 这些消息从阻塞队列接收 然后将其写出 现在再次保存 1MB 的
  • Java中可以使用Semaphore实现阻塞队列吗?

    我想知道是否可以使用Semaphore来实现阻塞队列 在下面的代码中 我使用一个信号量来保护关键部分 并使用另外两个信号量对象来跟踪空槽和已填充对象的数量 public class BlockingQueue private List qu
  • 生产者消费者 - ExecutorService 和 ArrayBlockingQueue

    我想知道我对使用 ExecutorService 和 ArrayBlockingQueue 的生产者消费者设计的理解是否正确 我知道有不同的方法来实现这个设计 但我想 最终 这取决于问题本身 我必须面对的问题是 我有一个制作人 他从一个大文
  • 如何立即释放在BlockingQueue上等待的线程

    考虑一个BlockingQueue和一些等待的线程poll long TimeUnit 也可能在take 现在队列是空的 需要通知等待线程它们可以停止等待 预期的行为是null退回或申报的InterruptedException throw
  • 多生产者多消费者多线程Java

    我正在尝试生产者 消费者问题的多个生产者 多个消费者用例 我使用 BlockingQueue 在多个生产者 消费者之间共享公共队列 下面是我的代码 Producer import java util concurrent BlockingQ
  • 异步通知 BlockingQueue 有可用项目

    我需要一个Object当某些情况时得到异步通知BlockingQueue有一件物品要赠送 我在 Javadoc 和网络上搜索了一个预制的解决方案 然后我最终得到了我的一个 也许是幼稚的 解决方案 如下 interface QueueWait
  • 为什么 LogWriter 中的竞争条件会导致生产者阻塞? 【并发实践】

    首先 为了防止那些不喜欢读到我已读完的人将问题标记为重复生产者 消费者日志服务以不可靠的方式关闭 https stackoverflow com questions 31626772 producer consumer logging se
  • 如何阻塞直到BlockingQueue为空?

    我正在寻找一种方法来阻止直到BlockingQueue是空的 我知道 在多线程环境下 只要有生产者将物品放入BlockingQueue 可能会出现队列变空 几纳秒后又充满项目的情况 但是 如果只有one生产者 那么它可能希望在停止将项目放入

随机推荐

  • 消息队列技术介绍

    一 消息队列概述 消息队列中间件是分布式系统中重要的组件 xff0c 主要解决应用耦合 异步消息 流量削锋等问题 实现高性能 高可用 可伸缩和最终一致性架构 是大型分布式系统不可缺少的中间件 目前在生产环境 xff0c 使用较多的消息队列有
  • Ubuntu 循环登录 解决办法

    Ubuntu 经常出现循环登录的情况 但需要根据不同原因 采用不同的解决方案 已知情况有 Xauthority 权限变为root 修改到自己账户权限即可nvidia 显卡驱动问题 卸载重装即可 Xauthority 权限问题 参考Ubunt
  • Kotlin基础(一)android studio中配置Kotlin

    1 何为Kotlin xff1f Kotlin是一门运行在JVM之上的语言 它由Jetbrains创建 xff0c 而Jetbrains则是诸多强大的工具 xff08 如知名的Java IDE IntelliJ IDEA xff09 背后的
  • 关于Ubuntu18.04 root账户登录的问题

    关于Ubuntu18 04 root账户登录的问题 一 Ubuntu 18 04添加root用户登录1 设置root用户2 修改 root profile3 修改 96 etc pam d 96 目录下的 96 gdm autologin
  • Ubuntu下fcitx崩溃,搜狗输入法乱码

    转载 xff1a https www findhao net res 786 预防原文删除 xff0c 侵删 方法 直接重启fcitx即可 xff1a fictx自带的重启 fcitx r 或者执行以下三条 xff1a 获得fcitx的进程
  • KVM虚拟化

    KVM虚拟化 文章目录 KVM虚拟化虚拟化简介 KVMKVM部署CPU虚拟化功能kvm管理界面安装 虚拟化简介 虚拟化 xff1a 在一台计算机上虚拟出多个逻辑的计算机 xff0c 而且每个逻辑计算机它可以是不同的操作系统 虚拟化技术 xf
  • Android系统Camera图片反转的一个问题

    一 问题提出 目前遇到项目问题 xff0c Camera预览图像是反的 xff0c 于是考虑设置180度反转以便正常 通过如下两种方式 xff1a params setRotation 180 java部分 p set CameraPara
  • I2C总线的SDA和SCL

    串行数据线SDA 负责在设备间传输串行数据 串行时钟线SCL 负责产生同步时钟脉冲 SCL SDA是I2C总线的信号线 I2C总线是共享的总线系统 xff0c 因此可以将多个I2C设备连接到该系统上 连接到I2C中总线上的设备既可以用作主设
  • ubuntu18.04输入密码登录不进去一直循环

    我是把ubuntu分辨率调了之后变成这样的 其实这个解决方法我也很无语 自己乱按弄到的 就是在登录的隔壁 xff0c 勾选第二个就可以了QUQ 评论区所知 xff1a 选择这个话 xff0c 会关掉nvidia显卡 选择wayland进入后
  • rhce2

    1 配置chrony时间服务器 xff0c 确保客户端主机能和服务主机同步时间 两台机器 第一台机器作为时间服务器从ntp aliyun com同步时间 xff08 注意包含意外情况 xff0c 如果不能上外网 xff0c 不能从阿里云同步
  • FFmpeg Android编译运行出现 Abort message: 'JniInvocation instance already initialized'

    signal 5 SIGTRAP code 1 TRAP BRKPT fault addr 0x272000000d03 Abort message JniInvocation instance already initialized 把
  • layui图标用法总结

    本文参考官方文档 layui图标使用官方文档 xff0c 建议先看此文章 xff0c 并结合本文的前两个步骤使用即可 1 下载layui js相关文档 xff0c layui js下载 xff0c 下载之后里面的内容如下 xff1a 只需要
  • Eggjs学习系列(一) 使用TypeScript快速入门

    Eggjs学习系列 xff08 一 xff09 使用TypeScript快速入门 Eggjs是一个node的渐近式开发框架 xff0c 用于服务端开发 而 TypeScript 是 JavaScript的超集 xff0c 在兼容 JavaS
  • Golang实现小型CMS内容管理功能(二):前端接入百度ueditor富文本编辑器

    当我们把接口都做好以后 xff0c 我们需要去开发前端界面 添加文章功能里面 xff0c 最重要的就是文章内容部分 xff0c 需要配置上富文本编辑器 xff0c 这样才能给我们的内容增加样式 下载ueditor代码 ueditor已经很久
  • 网络分析中数据包结构(含七层模型)

    七层模型 xff1a 包 Packet 是TCP IP协议通信传输中的数据单位 xff0c 一般也称 数据包 有人说 xff0c 局域网中传输的不是 帧 Frame 吗 xff1f 没错 xff0c 但是TCP IP协议是工作在OSI模型第
  • ubuntu下PyCharm遇到问题

    第三方库没有自动补全功能 xff08 autocomplete xff09 190921补充 xff1a 这个问题就是环境配置的问题当初真是无知 原因 xff1a PyCharm的人工编译环境和程序的运行环境不是同一个 xff08 说的太不
  • 用java简单的实现单链表的基本操作

    此代码仅供参考 xff0c 如有疑问欢迎评论 xff1a package com tyxh link 节点类 public class Node protected Node next 指针域 protected int data 数据域
  • 算法:海量日志数据,提取出某日访问百度次数最多的那个IP

    首先是这一天 xff0c 并且是访问百度的日志中的IP取出来 xff0c 逐个写入到一个大文件中 注意到IP是32位的 xff0c 最多有个2 32个IP 同样可以采用映射的方法 xff0c 比如模1000 xff0c 把整个大文件映射为1
  • 使用JUnit测试预期异常

    开发人员常常使用单元测试来验证的一段儿代码的操作 xff0c 很多时候单元测试可以检查抛出预期异常 expected exceptions 的代码 在Java语言中 xff0c JUnit是一套标准的单元测试方案 xff0c 它提供了很多验
  • BlockingQueue深入分析

    1 BlockingQueue 定义的常用方法如下 抛出异常特殊值阻塞超时插入add e offer e put e offer e time unit 移除remove poll take poll time unit 检查element