聊聊ThreadPoolExecutor线程池

2023-05-16

ThreadPoolExecutor是线程的池化技术,也就是首先创建几个线程,然后把线程放到池子里,有任务来的时候直接从线程池中拉线程来执行任务。为什么要用池化技术?java中的线程是系统级别的资源,创建、销毁线程都很消耗CPU的资源,有了池化技术,就可以复用线程,而不是频繁的创建、销毁。

在平常的项目开发过程中,我们可以通过Executors提供的方法创建,如newSingleThreadExecutor、newFixedThreadPool等方法,但是这些方法实际上是帮我们构建ThreadPoolExecutor对象,所以像阿里的大佬们都是不建议使用Executors来创建,而是建议自己去ThreadPoolExecutor对象,下面就来详细聊聊ThreadPoolExecutor的底层原理。

先来对构造方法的参数进行说明:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
  • corePoolSize表示核心线程数;
  • maximumPoolSize表示最大线程数,注意这里包括核心线程数和临时线程数;
  • keepAliveTime表示临时线程的存活时间(代码:workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)从队列中获取时设置超时时间,如果超时获取不到,则后续返回null,表示退出某个worker);
  • workQueue表示工作队列;
  • handler表示拒绝后的处理逻辑;

再说说整体的操作流程,如下图所示:

讲讲什么是Worker?

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable

Worker是Runnable的实现类,咱们就把它理解为线程,举个属于程序员的例子来说明,Worker就像当于程序员,比如在华为干活的就有很多程序员,一个程序员每次只能接一个任务。但是同样是在华为干活的程序员,为啥差距就那么大呢?那是因为有些程序员是华为的正规军,有些是外包的杂牌军,正规军就相当于corePool里面的核心线程,反正没有活干了或者出现危机不会被裁掉,而杂牌军则不同,没有活干了你们就回归母公司的怀抱。对比上述图形,如果有任务了,先正规军干,如果活太多干不完了,先放队列里面,队列放不下了就找杂牌军干,如果还是太多了,那对不起老子不接了;如果队列里面的任务干完了,先把杂牌军释放,留下正规军继续等活。

牛逼吹了这么多,下面一起来看看代码长啥样:

1.首先通过调用ThreadPoolExecutor的submit或execute方法,将任务添加到ThreadPoolExecutor中去执行,那么这里submit和execute方法有什么区别呢?

这里总结节点:

其一、submit底层就是调用了execute方法,只不过在调用execute方法之前,先将任务封装成了RunnableFuture类型,这里创建的是FutureTask类型的对象:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

至于在这里为什么要这么做,因为这里使用的FutureTask可以获取到任务的执行结果,具体如何使用的,感兴趣的朋友不妨百度下,或者在评论区留言交流。

其二、我们先来看下ThreadPoolExecutor的继承结构,如下:

其实submit方法是AbstractExecutorService这个抽象类提供的方法,execute方法才是ThreadPoolExecutor提供的,大家想想这里是不是使用到了模板设计模式,其实设计模式无处不在,有时间多看看大佬们写的代码,可以收获很多东西的。

其三、submit有返回值,execute没有返回值。

2.那我们就来看看execute方法,上代码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
  
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //关键点1
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //关键点2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //关键点3
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
      //关键点4
        reject(command);
}

说说几个关键点:

关键点1:

这里判断当前有几个Worker,如果worker数量小于corePoolSize则创建Worker;

关键点2:

如果大于corePoolSize个worker了,就往队列里面塞呗。

关键点3:

往队列里面塞成功了,而且是刚运行,并且worker数量为0,则创建一个worker。这里咱们掰扯掰扯,假如咱们的corePoolSize为0,那是不是表示前面还没有worker来干活啊,没人干活可不行,得创建个worker来干活。

关键点4:

如果核心worker满了(等于corePoolSize了),并且队列里面也塞满了,那就创建非核心worker,最多创建
maximumPoolSize-corePoolSize个非核心worker;如果worker和队列全满了,那就reject(拒绝)吧。

3.看看worker的庐山真面目

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{ 
    final Thread thread; 
    Runnable firstTask; 
    volatile long completedTasks;
 
    Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}

//ThreadPoolExecutor的方法
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//关键点1
                w.lock(); 
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
				
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {//关键点2
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

 

在addWorker方法中,会创建Worker对象,当创建成功后,会执行worker对象中的线程的start方法【this.thread = getThreadFactory().newThread(this);这个thread是worker的属性,是个线程】,而该线程会执行worker的run方法,从而调用runWorker方法。这里咱们也主要看几个关键点。

关键点1:

while (task != null || (task = getTask()) != null) 这个代码块就是通过while循环自旋,从队列中获取任务,如果任务为null则推出循环,也就是释放worker,那什么情况会返回null,看关键点2。

关键点2:

关注这个判断逻辑:

if ((wc > maximumPoolSize || (timed && timedOut))

&& (wc > 1 || workQueue.isEmpty())),来详细聊聊这个条件:

这个条件可以拆成两部分:

条件一、(wc > maximumPoolSize || (timed && timedOut)

条件二、(wc > 1 || workQueue.isEmpty())

只有两个条件都为true才会返回null,只有返回null了,才会销毁worker,也就是销毁多余的worker;

看条件一、正常情况下工作数量不可能大于maximumPoolSize,timed = allowCoreThreadTimeOut || wc > corePoolSize;allowCoreThreadTimeOut没有设置情况下为false,所以当前工作worker数量大于核心线程数量满足条件;

再看timedOut什么时候为true,当time=true时,调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)从队列中获取数据,如果超过keepAliveTime时间还没获取到数据,则timedOut=true,那什么时候获取不到数据?正常情况下是队列中没有任务才会获取不到任务。

看条件二、当workQueue.isEmpty()为空时满足条件;

综上,我们可以看出,当worker数量大于核心线程数量(也就是有杂牌军),并且队列中没有任务时,就满足了条件,满足条件表明将销毁woker。因为这里必须是大于核心线程数才会销毁,所以也就表明了会保留核心线程数个worker。

ok,眼睛花了,也写的差不多了,欢迎大家评论区留言,如果对你有帮助,也可以帮忙转发转发,点点赞!

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

聊聊ThreadPoolExecutor线程池 的相关文章

  • 2020-11-24

    如何才能撰写出好的短视频文案 首先是要调起用户的共鸣 xff0c 比如以前抖音上很火的一个视频 视频拍的很简单 xff0c 就是坐在出租车上拍摄的一些 xff0c 呼啸而过的高楼大厦和车流 xff0c 以及灰色的天空和不断向后跑的树 视频和
  • 2020-11-26

    短剧本创作小技巧 xff0c 提升完播率 相信每一个编剧在自己的短剧本拍成视频时 xff0c 都会有满足感和成就感 但有的时候 xff0c 也会辗转难眠 xff0c 而其中遇到了最大问题就是完播率太低 而影响完播率的关键所在 xff0c 就
  • 2020-11-28

    短剧本中的场景选择 一般在写短剧本的时候 xff0c 一个故事里面会有很多的故事场景 xff0c 场景写的比较好就会使短剧本在转换为段视频的时候 xff0c 制作的短视频会更加的好看 故事的场景也算得上是一个故事的主要开端 xff0c 开端
  • 基于C++11实现的阻塞队列(BlockQueue)

    思路 xff1a 生产者消费者模型 如图 xff0c 多个生产者线程和多个消费者线程共享同一固定大小的缓冲区 xff0c 它们的生产和消费符合以下规则 xff1a 生产者不会在缓冲区满的时候继续向缓冲区放入数据 xff0c 而消费者也不会在
  • 航空订票系统设计(java、数据库、通信联合项目)

    航空订票系统设计 xff08 java 数据库 通信联合项目 xff09 最近帮高中同学做的一个学校项目 xff0c 这个项目主要是用Java写的 前期主要搭建五个类 xff0c Order xff08 选择 xff09 Passenger
  • java基础之IO流中的字节流

    目录 一 xff1a 字节流写数据 二 xff1a 字节流写数据的方式 三 xff1a 字节流写数据实例 字节流写数据步骤 字节流写数据例子 字节流追加写数据 四 xff1a 字节流读取数据 五 xff1a 字节读取数据的方法 六 xff1
  • java基础之转换流

    目录 一 xff1a 解释 二 xff1a 转换流输入流 1 xff1a 构造方法 2 xff1a InputStreamReader读数据方法 3 xff1a 例子 3 xff1a 注意 三 转换流输出流 1 xff1a 构造方法 2 x
  • java基础之多线程的引入

    目录 一 xff1a 多线程概述 进程 线程 举例 二 xff1a 并行与并发 三 xff1a Java程序的运行原理 四 xff1a JVM启动的时候是单线程还是多线程呢 xff1f 一 xff1a 多线程概述 进程 正在运行的程序 xf
  • 电信运营商云计算体系架构分析

    电信运营商云计算体系架构分析 作者 xff1a 成晓旭 xff08 版权保留 欢迎转载 xff09 第三篇 xff1a 体系架构分析 电信运营商云计算发展分析之一 xff1a 战略定位分析 xff0c 可供参考 电信运营商云计算发展分析之二
  • java基础之线程安全问题(一)

    目录 一 xff1a 线程安全判断依据 二 xff1a 解决线程安全问题实现 同步代码块 1 格式 2 同步代码块的锁对象是谁 xff1f 3 xff1a 同步方法的时候 xff0c 锁对象又是谁呢 xff1f 4 xff1a 静态同步方法
  • java基础之线程安全问题(二)

    目录 一 xff1a Lock锁的使用 xff08 解决同步问题 xff09 二 xff1a 例子 三 xff1a 同步的特点 xff08 1 xff09 同步的前提 xff08 2 xff09 同步的好处 xff08 3 xff09 同步
  • redis基础知识

    目录 一 xff1a 概念 二 xff1a NoSQL分类 三 xff1a Redis数据模型 四 xff1a 键Key 一 xff1a 概念 开源的 xff08 BSD协议 xff09 xff0c 使用ANSI C 编写 xff0c 基于
  • Hive建表以及导入数据

    目录 一 xff1a 内部表和外部表 1 xff1a 外部表 2 xff1a 外部表 3 xff1a 外部表和内部表区别 二 xff1a 上传数据方式 一 xff1a 内部表和外部表 1 xff1a 外部表 内部表基础建表语句一 默认指定文
  • Hive行列互转

    目录 一 xff1a 行转列 1 xff1a lateral view 行转列 2 xff1a explode函数 3 xff1a 例子 二 xff1a 列转行 1 概念 2 xff1a 例子 一 xff1a 行转列 1 xff1a lat
  • Hive行转列的应用之计算公司累加收入

    公司代码 年度 1月 12月的收入金额 burk year tsl01 tsl02 tsl03 tsl04 tsl05 tsl06 tsl07 tsl08 tsl09 tsl10 tsl11 tsl12 853101 2010 100200
  • Hive的常用函数

    目录 1 关系运算 2 数值计算 3 条件函数 4 日期函数 5 xff1a 字符串函数 6 xff1a Hive 中的wordCount 1 关系运算 等值比较 61 61 61 lt 61 gt 不等值比较 61 lt gt 区间比较
  • IDEA配置spring环境,并简单测试

    目录 一 下载安装需要的依赖包 二 IDEA的部署 一 下载安装需要的依赖包 首先 xff0c 我们需要下载一点jar包 第一个jar包是第三方依赖的jar包 xff0c Spring的核心容器依赖commons logging的JAR包
  • Unity导出工作台(Console)数据

    首先在Unity中添加C 脚本 xff1a using System Collections using System Collections Generic using UnityEngine using UnityEditor usin
  • Unity利用代码生成空心立方体(立方体挖走一个圆柱)

    先看效果 还未生成mesh时挖去圆柱的立方体 生成mesh后挖去圆柱的立方体 放代码为敬 xff08 脚本挂在空物体上即可 xff09 xff1a using System Collections using System Collecti
  • 企业ERP系统开发总结及建议

    企业ERP系统开发总结及建议 作者 xff1a 成晓旭 对于像我们这种规模的大型公司 xff0c 自己建设 实施和维护满足公司特定管理要求的管理信息系统 xff0c 是目前部分大型公司建设企业ERP 的常见思路 比如 xff1a XXXX

随机推荐

  • Unity 3D导入txt文本文件坐标并打印

    先看打印结果 xff1a 首先我们创建一个txt文件 xff0c 将坐标输入或复制进去 xff0c 从左到右依次为x y z xff0c 中间用逗号 xff08 英文逗号 xff09 隔开 在Unity工程文件下的Assets文件夹下创建R
  • 2022研究生数学建模竞赛(华为杯)B题

    题目 xff1a 方形件组批优化问题 一 背景介绍 智能制造被 中国制造 2025 列为主攻方向 而个性化定制 更短的产品及系统生命周期 互联互通的服务模式等成为目前企业在智能制造转型中的主要竞争点 以离散行业中的产品为例 xff0c 如电
  • 2022研究生数学建模B题思路

    子问题1 xff1a 排样优化问题 要求建立混合整数规划模型 xff0c 在满足生产订单需求和相关约束条件下 xff0c 尽可能减少板材用量 约束 1 在相同栈 stack xff09 里的产品项 item xff09 的宽度 xff08
  • 找到并标记Mesh顶点

    1 在Unity 3D中新建一个物体 本文以Cube为例 2 创建一个C 脚本 命名为MeshTest 3 在脚本中写入程序 在打开的脚本 MeshTest 上编写代码 xff0c 首先获取 MeshFilter 组件 xff0c 然后获取
  • TCP服务器端、客户端通讯(赋源码)

    实现通讯 xff0c 我们首先要知道是怎么样的一个流程 xff0c 下图是我画的一个通讯流程图 xff1a 一 Linux服务器端 我是在Ubuntu20 04下进行的 xff0c 使用的是C 43 43 xff0c 引入头文件socket
  • win11 命令 wmic:无效的指令 解决办法

    我想你肯定看到过让你修改环境变量的方法 但是 xff0c 如果你的电脑就根本没有装wmic xff0c 再怎么修改环境变量也是徒劳 我们打开设置 xff1a Win 43 I 点击应用 选择 可选功能 添加可选功能 搜索wmic xff0c
  • Python面向对象编程:关于类的方法中属性是否加前缀self的问题

    问题的缘起 今天完成了LeetCode首秀 xff08 而且是用刚学不久的python做的 xff09 xff0c 心情挺激动的 xff0c 毕竟之前只涉猎了竞赛OJ xff0c 没有在应用型平台上刷过题 xff0c 不妨定一个小目标 xf
  • 概念物理Ⅱ 第一讲:绪论

    目录 物理学研究的对象和动机为什么要研究物理学 xff1f 1 满足人类对大自然基本规律的好奇心2 改进技术 发展生产 两个动机之间的关系 物理学的发展历史起源于古希腊古典物理学时间 xff1a 牛顿时代 1900年经典力学 xff08 牛
  • Machine Learning:k近邻算法(KNN)

    目录 写在前面的话k 近邻算法概述优点缺点适用数据范围 原理Python代码实现Sklearn直接调用weights选项algorithm选项 算法测试与结果评价原理及方法函数主要参数说明Python代码实现 示例反思与总结 写在前面的话
  • 《Python爬虫技术:深入理解原理、技术与开发》读书笔记(一)

    目录 前言第1章 基础知识第2章 爬虫基础HTTP基础URL与URI超文本HTTP与HTTPSHTTP的请求过程Network面版 前言 这是本系列的第一篇文章 xff0c 文如其题 xff0c 这个系列旨在学习Python爬虫技术 本系列
  • 给软件工程师的自学建议

    给软件工程师的自学建议 与现在大学生的情况类似 xff0c 学校学的专业知识总是与实际工作中需要的知识相差甚远 或许进入我们这个行业就注定要一辈子不离书本 不离学习了 由于软硬件技术的推陈出新 xff0c 学校教的C Basic Pasca
  • Python基础入门—for循环

    Python基础入门 for循环 for 循环 xff1a range的使用 xff1a 循环控制语句 xff1a for else的使用 xff1a for循环嵌套 xff1a for 循环 xff1a for循环格式 xff1a for
  • 软件测试之项目总结全攻略

    在我们测试工作过程中 xff0c 由于公司业务发展 xff0c 快速迭代等原因 xff0c 我们遇到的项目以小项目居多 更新界面元素 xff0c 上个活动页 xff0c 优化一下原有的功能等等 xff0c 加上事情繁琐 xff0c 任务多
  • 教你用Python写一个京东自动下单抢购脚本(Python实现京东自动抢购)

    很多朋友都有网购抢购限量商品的经历 有时候蹲点抢怎么也抢不到 今天小编带你们学习怎么用Python写一个京东自动下单抢购脚本 以后再也不用拼手速拼网速啦 快来一起看看吧 1 问题背景 经过无数次抢购失败后 xff0c 发现商家会不定时的放出
  • JAVA基础题练习

    顺序插入 xff1a 插入有序 Scanner input 61 new Scanner System in char arr2 61 39 b 39 39 d 39 39 f 39 39 i 39 39 k 39 39 m 39 39 x
  • ubuntu20.04安装qq音乐并解决闪退问题

    在qq音乐官网下载linux版coco音乐 xff1a https y qq com download download html 下载deb包并且通过下面命令行安装 xff1a span class token function sudo
  • 架构师装逼核武器

    架构师这个职位是很多程序猿的梦想 xff0c 我有很多朋友私下和我聊天的时候 xff0c 曾多次问我要如何才能成为一个架构师 xff0c 对于这个问题 xff0c 我只能粗略的谈谈我个人的观点 xff0c 如有不同观点 xff0c 欢迎交流
  • 一文讲透java日志框架

    在项目开发过程中 xff0c 有一个必不可少的环节就是记录日志 xff0c 相信只要是个程序员都用过 xff0c 可是咱们自问下 xff0c 用了这么多年的日志框架 xff0c 你确定自己真弄懂了日志框架的来龙去脉嘛 xff1f 下面笔者就
  • HashMap底层原理

    在我们实际的项目中 xff0c HashMap这个集合类经常被用到 xff0c 可是就是这么一个常用的集合类 xff0c 却往往成了咱们面试中的绊脚石 即便你是个初级程序员 xff0c 也常会让你谈谈HashMap的底层原理 xff0c 今
  • 聊聊ThreadPoolExecutor线程池

    ThreadPoolExecutor是线程的池化技术 xff0c 也就是首先创建几个线程 xff0c 然后把线程放到池子里 xff0c 有任务来的时候直接从线程池中拉线程来执行任务 为什么要用池化技术 xff1f java中的线程是系统级别