Kotlin进阶-7-阻塞队列+线程池

2023-05-16

目录

1、阻塞队列

1.1、常见阻塞场景

2、Java中的阻塞队列

2.1、ArrayBlockingQueue

2.2、LinkedBlockingQueue

2.3、PriorityBlockingQueue

2.4、DelayQueue

2.5、SynchronousQueue

2.6、LinkedTransferQueue

2.7、LinkedBlockingDeque

3、阻塞队列源码解析

4、阻塞队列使用场景

5、线程池

5.1、线程池的优势

5.2、ThreadPoolExecutor

5.3、线程池的执行流程和原理

5.4、线程池为什么要使用阻塞队列而不使用非阻塞队列?

6、线程池的种类

6.1、FixedThreadPool

6.2、CachedThreadPool

6.3、SingleThreadExecutor

6.4、ScheduledThreadPool


1、阻塞队列

阻塞队列常用于生产者和消费者的场景,生产者是往队列中添加元素的线程,消费者是从队列中拿元素的线程。

阻塞队列就是生产者存放元素的容器,而消费者也只从容器中拿元素。

1.1、常见阻塞场景

阻塞队列和普通队列不同地方在于,它会阻塞线程,而常见的阻塞场景有如下两种:

1、当队列中没有数据的情况下,消费者端的所有线程都会被阻塞(挂起),直到有数据放入到队列中。

2、当队列中填满数据的情况下,生产者端的所有线程都会被阻塞(挂起),直到队列中有空的位置,这些线程会被自动唤醒。

2、Java中的阻塞队列

Java中为我们提供了七个阻塞队列,他们分别如下:

2.1、ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。

默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。

我们可以使用以下代码创建一个公平的阻塞队列。

    val arrayBlockingQueue=ArrayBlockingQueue<Any>(1000,true)

2.2、LinkedBlockingQueue

基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)。

当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;

只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

2.3、PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。继承Comparable类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

看如下示例:输出结果会按照compareTo的比较顺序进行输出。

2.4、DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

2.5、SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。

使用以下构造方法可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的顺序访问队列。

    val queue: SynchronousQueue<Any> = SynchronousQueue<Any>(true)

2.6、LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

(1)transfer方法

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

(2)tryTransfer方法

tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。

对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

2.7、LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。

所谓双向队列指的是可以从队列的两端插入和移出元素。

双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。

另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。

在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。

另外,双向阻塞队列可以运用在“工作窃取”模式中。

3、阻塞队列源码解析

这里我们看一下ArrayBlockingQueue源码:

items:ArrayBlockingQueue维护着一个Object类型的数组;

takeIndex和putIndex:表示对头元素和队尾元素的下标,也就是被取出和插入那端下标;

count:表示数组中存储元素的数量;

lock:是一个可重入锁;详细了解可以看一下上一节:Kotlin进阶-6-重入锁+synchronized+volatile

notEmpty和notFull:是等待条件。notEmpty是控制消费者线程的等待条件,notFull是控制生产者线程的等待条件。

接下来我们看一下put()方法--------------------------------------------------->

从put()方法的实现我们可以看出,它首先拿到了锁,并且获取的是可中断锁,然后判断当前数据存储的数量是否达到了数组的长度,如果达到了,则调用notFull.await()阻塞当前线程,等待元素数量减少并且被其他线程通过调用signal()来唤醒。

看下面代码,当我们从满的队列中取出了一个元素之后,他就会通过signal()方法来唤醒前面阻塞的生产者线程中的其中一个,来往队列中添加元素。

如果数组没满,就会通过----------------------------->enqueue()   方法插入元素。

enqueue()的实现很简单,就是往items数组中添加了一个新的元素,同时它会唤醒一个消费者线程来消费该元素。

我们再来看看怎么从队列中取元素 -----------------------------> take()

take()方法的实现和put()方法类似,都是先获取到锁,然后如果当前数组中没有一个元素的话,就让消费者线程进行等待,这时候就需要添加元素的enqueue()方法来唤醒这些等待的消费者线程。

如果有元素的话,就去enqueue()方法中去获取该元素,同时唤醒生产者线程去添加元素。

4、阻塞队列使用场景

除了线程池使用了阻塞队列之外,生产者消费者模式也常常会使用到阻塞队列。

看如下代码:你会发现利用阻塞队列实现的生产者和消费者模式会很简洁。而且我们不需要考虑同步和线程间的通信问题。

5、线程池

在编程中经常使用线程来异步处理任务,比如网络请求,或者下载图片等,但是每个线程的创建和销毁都需要一定的开销。

如果每次执行一个异步任务都开启一个新的线程的话,那这些线程的创建和销毁将带来很大的资源消耗,而且每个线程执行过程中你都没控制,这个时候就需要线程池的出现,来管理这些单个线程了。

5.1、线程池的优势

  1. 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
  2. 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
  3. 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场))。
  4. 提供更强大的功能,延时定时线程池。

5.2、ThreadPoolExecutor

在Executor框架中最核心的成员就是ThreadPoolExecutor,它是线程池的核心实现类,它的主要参数如下:

corePoolSize核心线程数):当默认情况下,线程池是空的,当向线程池提交一个任务时,若线程池中已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数等于corePoolSize时,(除了利用提交新任务来创建和启动线程(按需构造),也可以通过 prestartCoreThread() 或 prestartAllCoreThreads() 方法来提前启动线程池中的所有核心线程。)

maximumPoolSize线程池允许创建的线程数最大大小):线程池所允许的最大线程个数。其中包括核心线程和非核心线程,当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。

keepAliveTime非核心线程闲置的超时时间):非核心线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁。如果设置allowCoreThreadTimeOut=true,那么这个超时时间也会应用到核心线程上

unitkeepAliveTime非核参数的时间单位,可以是 天、小时、分钟、秒、毫秒等。

workQueue任务队列):如果当前消费任务的核心线程全部启动起来,这时候就会把任务放入到该队列中,该队列是BlockingQueue类型的,也就是阻塞队列。

threadFactory线程工厂):可以用线程工厂为每个创建出来的线程设置名字,一般情况无需设置该参数。

RejectedExecutorHandler(线程饱和策略):当线程池和任务队列都满了,再提交任务就会执行该策略,默认情况下是AbordPolicy,表示无法处理该情况,并抛出RejectedExecutorException异常。

另外还有:CallerRunsPolicy:用调用者所在的线程来处理多余的任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

DiscardPolicy:不能指向多余的任务,并将该任务删除;

DiscardOldestPolicy:丢弃最近添加到队列的任务,并执行当前的任务。

5.3、线程池的执行流程和原理

5.4、线程池为什么要使用阻塞队列而不使用非阻塞队列?

阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。
当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。使得在线程不至于一直占用cpu资源。

6、线程池的种类

通过直接或者间接地配置ThreadPoolExecutor的参数可以创建出不同类型的线程池;其中比较常见的有4种:

6.1、FixedThreadPool

FixedThreadPool是可重用固定线程数的线程池。它的创建如下:

corePoolSizemaximumPoolSize都设置为了相同的数量,(核心线程数==最大的线程数量)这就说明了,该线程池只有核心线程数,keepAliveTime设置为0L,表示如果创建了非核心线程会被立即销毁。另外任务队列采用了无界的阻塞队列LinkedBlockingQueue

它的执行流程如下:没有非核心线程的步骤。

6.2、CachedThreadPool

CachedThreadPool是根据任务数量创建线程的线程池;它的实现如下:

看上面的参数:corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE都这说明该线程池,没有核心线程数。而且这里使用了不存储元素的阻塞队列SynchronouesQueue,这就说明我们往该线程池添加的所有任务,都只会交给非核心线程来处理,而且如果同时添加的数量过大,我们创建的线程数量也会很大。

它比较适合处理大量需要立即处理并且耗时很少的任务。

6.3、SingleThreadExecutor

SingleThreadExecutor是使用单个线程的线程池:

corePoolSizemaximumPoolSize都设置为1,那就是该线程池只会有一个线程了。

6.4、ScheduledThreadPool

ScheduledThreadPool是一个能实现定时和周期性处理任务的线程池:

这里可以看到,核心线程数=你设置的数量,线程池的线程总数是Integer的最大值,但是这里使用了无界的DelayedWorkQueue阻塞队列,说明该线程池不会创建非核心线程,超过核心线程数量的任务都会被添加到该无界队列中。

该线程池通过执行scheduleAtFixedRatescheduleWithFixedDelay来提交任务,在该方法中会将任务包装成ScheduledFutureTask 任务,然后添加到DelayWorkQueue中,DelayWorkQueue会将任务按照时间顺序进行排序。

 

 

 

 

 

 

 

 

 

 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kotlin进阶-7-阻塞队列+线程池 的相关文章

随机推荐

  • 2023年最新版kali linux安装教程

    一 前期准备 前排提醒 xff0c 文末有绿色版安装包免费领取 xff01 二 VMware虚拟机配置 1 打开vmware xff0c 点击创建新的虚拟机 2 选择自定义 高级 选项 xff0c 点击下一步 3 继续下一步 4 选择 稍后
  • CentOS7.1下 安装vncserver和删除vnc占有的端口

    今天给两台新服务器装CentOs7 1系统 xff0c 然后装VNCServer的时候感觉网上的教程要么复杂多此一举 xff0c 要么不清楚 xff0c 关于 list端口的部分都没讲 所以这里整理一下 xff0c 按着下面的顺序来就可以了
  • mac使用虚拟机(VirtualBox+centos7)搭建kubernetes(K8S)集群

    文章目录 说明一 环境准备1 配置主机网络2 配置磁盘空间3 安装虚拟机配置网络4 设置Linux环境 三台均需要设置 二 安装docker kubeadm kubelet kubectl 三台均需要设置 1 安装docker环境2 kub
  • .NET6入门:1.Windows开发环境搭建

    作为 NET的最新版本 NET6长期支持版已经发布 xff0c NET6宣称是迄今为止最快的 NET 那当然不能落下时代的潮流 xff0c 就让我们跟着文章进入 NET6的世界吧 1 NET6SDK下载 Download NET Linux
  • 音视频编解码原理(一) 封装格式和编码方式简介

    一 封装格式 要了解音视频编解码原理 xff0c 首先需要知道什么是封装格式 xff1f 所谓封装格式 xff0c 就是将已经编码压缩好的视频轨和音频轨按照一定的格式封装到一个文件中 xff0c 一般情况下 xff0c 不同的封装格式对应不
  • VS调试方法总结(二)

    通过结构化异常定位崩溃程序 程序崩溃时 xff0c 生成文本文件 xff0c 记录崩溃得堆栈信息 直接上代码 已经编译通过 xff0c 拷贝直接可用 h include lt Windows h gt include lt stdarg h
  • QT(C++) + OpenCV + Python库打包发布可执行EXE

    QT xff08 C 43 43 xff09 43 OpenCV 43 Python库打包发布可执行EXE 背景 最近写了一个操作界面 xff0c 不仅用到了OpenCV的函数 xff0c 还调用了一个python脚本 xff0c 所以这里
  • Linux 内存管理 页回收和swap机制

    页高速缓存和页写回机制 页是物理内存或虚拟内存中一组连续的线性地址 xff0c Linux内核以页为单位处理内存 xff0c 页的大小通常是4KB 当一个进程请求一定量的页面时 xff0c 如果有可用的页面 xff0c 内核会直接把这些页面
  • docker 创建 network,出现异常问题及解决

    最近在使用 docker 创建 network 时 xff0c 出现错误 xff1a Error response from daemon could not find plugin bridge in v1 plugin registry
  • 工作进度所占总进度的比例

    如果实现的功能模块全部完成为 或者说 工作进度 xff1a 100 那么 xff1a 1 产品原型全部完成 包括文档的整理 xff1a 15 2 UI设计全部完成 xff1a 10 3 后台全部完成 xff1a 25 4 前台全部完成 xf
  • Docx4J替换内容时,内容换行失败问题解决

    WordprocessingMLPackage wordMLPackage 61 WordprocessingMLPackage load new java io File templatePath MainDocumentPart doc
  • C语言经典例题-用4×4矩阵显示从1到16的所有整数,并计算每行、每列和每条对角线上的和

    编写一个程序 xff0c 要求用户 按任意次序 xff09 输入从1到16的所有整数 xff0c 然后用4 4矩阵的形式将它们显示出来 再计算出每行 每列和每条对角线上的和 include lt stdio h gt int main in
  • java中Map.hashCode()函数说明

    在java中 xff0c Map hashCode 函数是在具有一定工作积累后 xff0c 为了更好的成长不可避免需要研究的内容 首先 xff0c 我们先看下原始代码 xff1a static final int hash Object k
  • 不支持的特性: getMetaData,问题解决

    最近使用springboot 43 mybatis 时遇到 xff1a 不支持的特性 getMetaData 的异常 使用 xff1a 64 Options useGeneratedKeys 61 false 解决的该问题 xff1b 官方
  • jsp四种范围

    page代表是与一个页面相关的对象和属性 request代表是与 Web 客户机发出一个请求相关的对象和属性 session代表是与用于某个 Web 客户机的一个用户体验相关的对象和属性 application代表是与整个 Web 应用程序
  • Kotlin-17-等号比较(== 、===)

    目录 1 Java中的 61 61 2 Java中的 equals 3 两者的区别 3 对于基本数据类型的 61 61 比较 4 Kotlin中的 61 61 与 61 61 61 1 Java中的 61 61 Java中的 61 61 直
  • Kotlin-30-继承多个父类

    目录 1 Java中的继承 2 Kotlin中的继承 1 Java中的继承 Java中的类只能继承一个父类 xff0c 是无法实现继承多个父类 xff0c 但是一个类可以实现多个接口 Java中的接口是无法给函数添加函数体的 abstrac
  • 算法-9-快速排序

    目录 1 描述 2 特点 3 代码实现 3 1 切分函数partition 图示 4 性能 5 快速排序优化 xff08 小数组插入排序 xff09 6 快排优化 xff08 三向切分 xff09 1 描述 快速排序也是基于递归实现的 和归
  • Kotlin进阶-6-重入锁+synchronized+volatile

    目录 1 介绍 2 线程的状态 3 创建线程 4 线程同步 4 1 可重入锁 4 2 不可重入锁的实现 4 3 可重入锁的实现 4 4 Java中的可重入锁 ReentrantLock 4 5 同步方法 synchronized 5 vol
  • Kotlin进阶-7-阻塞队列+线程池

    目录 1 阻塞队列 1 1 常见阻塞场景 2 Java中的阻塞队列 2 1 ArrayBlockingQueue 2 2 LinkedBlockingQueue 2 3 PriorityBlockingQueue 2 4 DelayQueu