BlockingQueue深入分析

2023-05-16

1.BlockingQueue定义的常用方法如下
 抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
检查element()peek()不可用不可用

1)add(anObject):anObject加到BlockingQueue,即如果BlockingQueue可以容纳,则返回true,否则招聘异常

2)offer(anObject):表示如果可能的话,anObject加到BlockingQueue,即如果BlockingQueue可以容纳,则返回true,否则返回false.

3)put(anObject):anObject加到BlockingQueue,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

5)take():取走BlockingQueue里排在首位的对象,BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

其中:BlockingQueue 不接受null 元素。试图addput 或offer 一个null 元素时,某些实现会抛出NullPointerExceptionnull 被用作指示poll 操作失败的警戒值。 

2、BlockingQueue的几个注意点

【1】BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个remainingCapacity,超出此容量,便无法无阻塞地put 附加元素。没有任何内部容量约束的BlockingQueue 总是报告Integer.MAX_VALUE 的剩余容量。

【2】BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持Collection 接口。因此,举例来说,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

【3】BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAll 和removeAll没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

【4】BlockingQueue 实质上不支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的end-of-stream 或poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。
3、简要概述BlockingQueue常用的四个实现类

1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.

2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的

3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.

4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

    
其中LinkedBlockingQueueArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.  
下面主要看一下 ArrayBlockingQueue的源码:
public boolean offer(E e) {    
        if (e == null) throw new NullPointerException();    
        final ReentrantLock lock = this.lock;//每个对象对应一个显示的锁    
        lock.lock();//请求锁直到获得锁(不可以被interrupte)    
        try {    
            if (count == items.length)//如果队列已经满了    
                return false;    
            else {    
                insert(e);    
                return true;    
            }    
        } finally {    
            lock.unlock();//    
        }    
}    
看insert方法:    
private void insert(E x) {    
        items[putIndex] = x;    
        //增加全局index的值。    
        /*   
        Inc方法体内部:   
        final int inc(int i) {   
        return (++i == items.length)? 0 : i;   
            }   
        这里可以看出ArrayBlockingQueue采用从前到后向内部数组插入的方式插入新元素的。如果插完了,putIndex可能重新变为0(在已经执行了移除操作的前提下,否则在之前的判断中队列为满)   
        */   
        putIndex = inc(putIndex);     
        ++count;    
        notEmpty.signal();//wake up one waiting thread    
}    

public void put(E e) throws InterruptedException {    
        if (e == null) throw new NullPointerException();    
        final E[] items = this.items;    
        final ReentrantLock lock = this.lock;    
        lock.lockInterruptibly();//请求锁直到得到锁或者变为interrupted    
        try {    
            try {    
                while (count == items.length)//如果满了,当前线程进入noFull对应的等waiting状态    
                    notFull.await();    
            } catch (InterruptedException ie) {    
                notFull.signal(); // propagate to non-interrupted thread    
                throw ie;    
            }    
            insert(e);    
        } finally {    
            lock.unlock();    
        }    
}    

public boolean offer(E e, long timeout, TimeUnit unit)    
        throws InterruptedException {    
   
        if (e == null) throw new NullPointerException();    
    long nanos = unit.toNanos(timeout);    
        final ReentrantLock lock = this.lock;    
        lock.lockInterruptibly();    
        try {    
            for (;;) {    
                if (count != items.length) {    
                    insert(e);    
                    return true;    
                }    
                if (nanos <= 0)    
                    return false;    
                try {    
                //如果没有被 signal/interruptes,需要等待nanos时间才返回    
                    nanos = notFull.awaitNanos(nanos);    
                } catch (InterruptedException ie) {    
                    notFull.signal(); // propagate to non-interrupted thread    
                    throw ie;    
                }    
            }    
        } finally {    
            lock.unlock();    
        }    
    }    

public boolean add(E e) {    
    return super.add(e);    
}    
父类:    
public boolean add(E e) {    
        if (offer(e))    
            return true;    
        else   
            throw new IllegalStateException("Queue full");    
    }  
该类中有几个实例变量: takeIndex/putIndex/count
用三个数字来维护这个队列中的数据变更:    
    /** items index for next take, poll or remove */   
    private int takeIndex;    
    /** items index for next put, offer, or add. */   
    private int putIndex;    
    /** Number of items in the queue */   
    private int count;    
转自:http://blog.csdn.net/vernonzheng/article/details/8247564
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

BlockingQueue深入分析 的相关文章

  • BlockingQueue深入分析

    1 BlockingQueue 定义的常用方法如下 抛出异常特殊值阻塞超时插入add e offer e put e offer e time unit 移除remove poll take poll time unit 检查element
  • BlockingQueue

    BlockingQueue 一 阻塞队列基本方法介绍 谈到线程池 xff0c 不得不谈到生产者 消费者模式 xff0c 谈到生产者 消费者 xff0c 就不得不谈到对应的数据结构 xff0c 谈到对应的数据结构不得不言 BlockingQu
  • 【Python】中文乱码问题与解决方案 深入分析

    一直以来 xff0c python中的中文编码就是一个极为头大的问题 xff0c 经常抛出编码转换的异常 xff0c python中的str和unicode到底是一个什么东西呢 xff1f 在本文中 xff0c 以 39 哈 39 来解释作
  • 深入分析OpenFlow协议

    文章目录 OpenFlow是什么 xff1f xff1f xff1f OpenFlow的起源与发展OpenFlow工作原理OpenFlow组件OpenFlow控制器1 NOX POX2 ONOS3 OpenDaylight OpenFlow
  • 阻塞队列-BlockingQueue

    对于Queue而言 xff0c BlockingQueue是主要的线程安全的版本 xff0c 具有阻塞功能 xff0c 可以允许添加 删除元素被阻塞 xff0c 直到成功为止 xff0c blockingqueue相对于Queue而言增加了
  • BlockingQueue深入分析

    1 BlockingQueue 定义的常用方法如下 抛出异常特殊值阻塞超时插入add e offer e put e offer e time unit 移除remove poll take poll time unit 检查element
  • Java 阻塞队列--BlockingQueue

    1 什么是阻塞队列 xff1f 阻塞队列 xff08 BlockingQueue xff09 是一个支持两个附加操作的队列 这两个附加的操作是 xff1a 在队列为空时 xff0c 获取元素的线程会等待队列变为非空 当队列满时 xff0c
  • ArrayBlockingQueue

    在java多线程操作中 BlockingQueue
  • Feign 使用 @SpringQueryMap 来解决多参数传递问题

    本文目录 1 Feign传递一个bean对象参数 2 Feign传递一个bean对象参数 多个基本类型参数 3 Feign传递多个基本类型参数 4 Feign传递多个bean对象参数 在实际项目开发过程中 我们使用 Feign 实现了服务与
  • 如何并行等待多个阻塞队列?

    我有两个独立的阻塞队列 客户端通常使用第一个或第二个阻塞队列来检索要处理的元素 在某些情况下 客户端对两个阻塞队列中的元素感兴趣 无论哪个队列首先提供数据 客户端如何并行等待两个队列 您可以尝试使用poll某种循环中的方法 仅在轮询另一个队
  • 有没有办法保存最大 1 MB 的“消息”集合并将结果写入 JSON/CSV 文件

    我有一个阻塞队列 它不断通过某些应用程序获取消息 现在在 asp net 应用程序中 我尝试使用该队列并将输出写入 CSV JSON 文件 在这里 我想保存最多 1MB 的消息 这些消息从阻塞队列接收 然后将其写出 现在再次保存 1MB 的
  • Java中可以使用Semaphore实现阻塞队列吗?

    我想知道是否可以使用Semaphore来实现阻塞队列 在下面的代码中 我使用一个信号量来保护关键部分 并使用另外两个信号量对象来跟踪空槽和已填充对象的数量 public class BlockingQueue private List qu
  • Mediacodec,解码来自服务器的字节数据包并将其渲染在表面上

    我对 MediaCode 有一些问题 我有 3 个组件 解码器 下载器和渲染器 又简单FragmentStreamVideo初始化 SurfaceView 和 Downloader 其他组件 例如渲染器和解码器 在 SurfaceView
  • 生产者消费者 - ExecutorService 和 ArrayBlockingQueue

    我想知道我对使用 ExecutorService 和 ArrayBlockingQueue 的生产者消费者设计的理解是否正确 我知道有不同的方法来实现这个设计 但我想 最终 这取决于问题本身 我必须面对的问题是 我有一个制作人 他从一个大文
  • 如何立即释放在BlockingQueue上等待的线程

    考虑一个BlockingQueue和一些等待的线程poll long TimeUnit 也可能在take 现在队列是空的 需要通知等待线程它们可以停止等待 预期的行为是null退回或申报的InterruptedException throw
  • 多生产者多消费者多线程Java

    我正在尝试生产者 消费者问题的多个生产者 多个消费者用例 我使用 BlockingQueue 在多个生产者 消费者之间共享公共队列 下面是我的代码 Producer import java util concurrent BlockingQ
  • 异步通知 BlockingQueue 有可用项目

    我需要一个Object当某些情况时得到异步通知BlockingQueue有一件物品要赠送 我在 Javadoc 和网络上搜索了一个预制的解决方案 然后我最终得到了我的一个 也许是幼稚的 解决方案 如下 interface QueueWait
  • 如何阻塞直到BlockingQueue为空?

    我正在寻找一种方法来阻止直到BlockingQueue是空的 我知道 在多线程环境下 只要有生产者将物品放入BlockingQueue 可能会出现队列变空 几纳秒后又充满项目的情况 但是 如果只有one生产者 那么它可能希望在停止将项目放入
  • 如何为信号量中等待的线程提供优先级?

    我使用信号量来限制访问函数的线程数量 我希望接下来要唤醒的线程应该由我将给出的某个优先级选择 而不是默认信号量唤醒它们的方式 我们怎样才能做到这一点 这是实现 class MyMathUtil2 implements Runnable do
  • LinkedBlockingQueue 抛出 InterruptedException

    我有这段代码 ALinkedBlockingQueue应该只抛出一个Exception如果在等待添加到队列时被中断 但这个队列是无限的 所以它应该尽快添加 为什么我的关闭方法会抛出一个InterruptedException private

随机推荐