线程池源码分析(一)

2023-11-13

 

最近在阅读《阿里巴巴Java开发手册》的时候,书中有这么一段话:

线程池这块理解不是很深,今天就抽时间重新学习一遍。对于书中的问题分析完成后答案便一目了然。


创建线程池的一个方式:

ExecutorService e = Executors.newFixedThreadPool(5);

Executors 相当于一个工厂类,它应该是提供了一下几种类型的线程池:

1.newFixedThreadPool:创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

2.newWorkStealingPool:创建持有足够线程的线程池来支持给定的并行级别,并通过使用多个队列,减少竞争,它需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量。

3.newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

4.newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

5.newSingleThreadScheduledExecutor:创建一个单例线程池,定期或延时执行任务。

6.newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。


以 FixedThreadPool 为例分析一下源码:

创建:

//Executors类中:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
//注意阻塞队列为 new LinkedBlockingQueue<Runnable>();↑

//ThreadPoolExecutor 构造函数:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

可以看出以 Executors.newFixedThreadPool(int nThreads) 这种方式创建出的线程池,阻塞队列为 LinkedBlockingQueue,线程工厂为默认工厂,拒绝策略为默认拒绝策略,等待销毁时间为0s

 

LinkedBlockingQueue构造方法:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

所以开篇的问题就迎刃而解,new LinkedBlockingQueue(),默认容量为 Integer.MAX_VALUE,也就是说,Executors.newFixedThreadPool() 这种方式创建出的线程池并没有指定阻塞队列的容量,这样是可以不断添加线程直到达到 Integer.MAX_VALUE,这就会造成内存问题。


创建好线程池后,接下来就是往里面添加任务了。而添加任务有两种方式:

1.execute();

2.submit();

submit() 可以获取该任务执行的Future。下文再说,我们先看 execute():

在这之前先看一下 ThreadPoolExecutor的属性:

//1.ctl就比较厉害了,记录了线程池的状态(高3位 & 运算) and 任务数目(低29位 & 运算)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//32-3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//1是32位,左移29位得到:高3位为001 低29位为0的数;之后再减1得到:高三位为000,低29位全为1的数
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

//下面几个状态都是:左移29位,用就是为了
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
// ~COUNT_MASK取反,高3位都为1,低29位为0
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

知道这些,再看源码就没什么难的了:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        //1.获取线程池的当前状态
        int c = ctl.get();

        //2.如果工作的线程小于核心线程数,执行addWorker()创建新的线程执行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();//addWorker() 是个耗时操作需要重新获取线程池状态
        }
        //3.如果线程池是 Running 状态并且成功加入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();//重新获取 value
            //如果此时发现线程池已经不是 Running并且从 workQueue中成功移除此线程,执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
}

 

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) { //自旋
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {//再一个自旋
                //通过 core 判断:是否工作线程 >= corePoolSize 或 >= maximumPoolSize
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                //1.尝试 cas 增加
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

搞完这个源码,我可算是理解多态了.....

惯例沙雕图:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

线程池源码分析(一) 的相关文章

随机推荐

  • 前端好用工具推荐-获取页面区块坐标

    最近分析网页的区块的逻辑关系和区块的坐标 找了一下相关的画图工具和插件 最后发现还是chrome牛叉 工具多 插件也多 而且非常方便好用 其中一个插件叫做 FE助手 能够方便的确定页面位置的坐标 而且使用起来相当方便 除此之外还有很多页面代
  • Stream流的使用

    目录 流介绍 流的生成 流的操作类型 流的使用 中间操作 终端操作 流介绍 流是从支持数据处理操作的源生成的元素序列 源可以是数组 文件 集合 函数 流的目的不在于保存数据 而是计算 流的生成 通常有5种方式 1 通过集合生成 List
  • 李宏毅老师机器学习选择题解析

    机器学习选择题解析加整理 项目说明 本项目是李宏毅老师在飞桨授权课程的配套问题 课程 传送门 该项目AiStudio项目 传送门 仅供学习参考 三岁出品必是精品 整理内容源于李宏毅老师机器学习课程群提问答疑解析内容 单选题 一 机器学习训练
  • nginx 禁止浏览器(www.xxxxx.com.cn/test/)访问目录,允许本地去访问目录和目录中文件

    location test return 404 解释 location 匹配浏览器的域名 区分大小写 test 域名后面跟着的目录名称 可以换成别的 return 404 也可以把 deny all 改换成 return 404 这样将返
  • TensorFlow安装使用问题集锦(不定期更新)

    记录TF安装使用过程中出现的bug进行记录与解决 1 SystemError Sonnet requires tensorflow probability minimum version 0 4 0 to be installed If u
  • 程序开发过程中的传值问题

    一 传参方式 单个值 二 传参方式 url传递多个值 用 三 传参方式 1 url传数组 2 url传多个参数 需要用 分号分割 3 案例 A页面向B页面传值几个步骤 1 先在A页面写单选提交事件传值 2 在传入页面B页面的onload里面
  • 高速下载百度网盘资料(Tampermonkey+百度网盘直链下载助手+xdown)

    下载百度网盘中的游戏 电影等文件时 由于百度自身对下载速度的限制 非VIP用户总是无法全速下载 下载速度一般在100KB s左右 如果短时间内下载文件 gt 10G还会有更严苛的下载速度限制 一般在50KB s 一周后解除限速 一旦我们想下
  • Unity5.x Animator之RootMotion

    Unity3D 中 Generic 动画导入设置和 Root Motion 之间的关系 Unity3D 的 Mecanim 动画系统可以直接复用 3DS MAX 中制作的动画文件中的位移 这个就是通过 applyRootMotion 来达成
  • UnityNative Plugin 导出时遇到的坑

    必须要在链接的输入里面添加模块定义文件 文件大概是这样 就是为了阻止名称混淆 否则UnityPluginLoad UnityPluginUnload这两个函数无法自动被Unity加载 file used by Visual Studio p
  • 死磕 java同步系列之JMM(Java Memory Model)

    简介 Java内存模型是在硬件内存模型上的更高层的抽象 它屏蔽了各种硬件和操作系统访问的差异性 保证了Java程序在各种平台下对内存的访问都能达到一致的效果 硬件内存模型 在正式讲解Java的内存模型之前 我们有必要先了解一下硬件层面的一些
  • Linux中三种引号(单引号、双引号、反引号)的区别

    1 双引号 保护特殊元字符和通配符不被shell解析 但是允许变量和命令的解析 以及转义符的解析 2 单引号 单引号内不允许任何变量 元字符 通配符 转义符被shell解析 均被原样输出 使用双引号或反斜杠转义可显示输出单引号 但是双引号和
  • 创建型设计模式之抽象工厂(Abstract Factory)模式

    定义 为创建一组相关或相互依赖的对象提供一个接口 而且无需指定他们的具体类 用意 客户端在不必指定产品的具体类型情况下 创建多个产品族中某个产品对象 定义图 参与者 抽象工厂 Creator 工厂方法核心 由一个接口或抽象类实现 具体工厂类
  • 【每日知识】点击下载按钮

    代码如下 methods OncilckUpload let link document createElement a 创建一个a标签 link style display none 将a标签隐藏 link href https api
  • $set在vue中的应用案例

    vue中this set在官方API中是这样说的 Vue set target propertyName index value 参数 Object Array target string number propertyName index
  • java api如何获取kafka所有Topic列表,并放置为一个list

    kafka内部所有的实现都是通过TopicCommand的main方法 通过java代码调用API TopicCommand main options 的方式只能打印到控制台 不能转换到一个list 下面讲解下如何转换为list 1 查看主
  • 三分钟快速搭建家纺行业小程序商城

    在互联网时代 越来越多的企业开始意识到小程序的重要性和价值 家纺行业也不例外 许多家纺企业开始关注和投入小程序商城的建设 然而 对于大多数家纺企业来说 搭建一个小程序商城可能会显得十分困难和复杂 但是 通过乔拓云平台 你只需要三分钟 就可以
  • 【Stm32野火】:野火STM32F103指南者开发板烧写官方示例程序LCD无法点亮?LCD示例程序无法使用?

    项目场景 大家好 最近在使用野火STM32F103指南者开发板的时候发现官方的示例程序LCD驱动代码居然无法直接驱动LCD点亮 这让我百思不得其解 以下就是我的踩坑填坑的过程 希望对大家有所帮助 野火官方资料下载文档链接 野火STM32F1
  • 计算机核心期刊排名及投稿信息

    1 计算机学报 北京 中国计算机学会等 2 软件学报 北京 中国科学院软件研究所 3 计算机研究与发展 北京 中国科学院计算技术研究所等 4 自动化学报 北京 中国科学院等 5 计算机科学 重庆 国家科技部西南信息中心 6 控制理论与应用
  • [Python图像处理] 三十七.OpenCV直方图统计两万字详解(掩膜直方图、灰度直方图对比、黑夜白天预测)

    该系列文章是讲解Python OpenCV图像处理知识 前期主要讲解图像入门 OpenCV基础用法 中期讲解图像处理的各种算法 包括图像锐化算子 图像增强技术 图像分割等 后期结合深度学习研究图像识别 图像分类应用 希望文章对您有所帮助 如
  • 线程池源码分析(一)

    最近在阅读 阿里巴巴Java开发手册 的时候 书中有这么一段话 线程池这块理解不是很深 今天就抽时间重新学习一遍 对于书中的问题分析完成后答案便一目了然 创建线程池的一个方式 ExecutorService e Executors newF