JDK线程池源码分析

2023-10-30

0 概述

线程池,从字面的含义上来看,是指管理一组工作线程(Worker Thread)的资源池。线程池是与工作队列(Work Queue)密切相关的,其中在工作队列中保存了需要执行的任务。工作线程很简单:从任务队列中取出一个任务,执行任务,然后返回线程池并等待下一个任务。前面提到了线程池基本概念,那么为什么要使用线程池呢?创建线程要花费一定的的资源和时间,如果任务来了才创建线程那么响应时间会变长,而且一个进程能创建的线程数有限的。本文主要主要来分析JDK线程池的源码实现。以便我们更好的理解和使用线程池。

1 使用实例

public class ThreadPool {
    //线程池初始化corePoolSize=2,maximumPoolSize=10,keepAliveTime=600s 
    private  static Executor executor=new ThreadPoolExecutor(2, 10,
            600, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(10));
    private static AtomicInteger atomicInteger=new AtomicInteger();

    public static void main(String[] args) {
        
        for(int i=0;i<20;i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                   System.out.println(Thread.currentThread().getName());
                }
            });
        }

    }
}
输出:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3

线程池几个关键的参数说明,
1.corePoolSize 核心线程数
2.最大线程数
3.当线程池中线程数量大于corePoolSize(核心线程数量)或设置了allowCoreThreadTimeOut(是否允许空闲核心线程超时)时,线程会根据keepAliveTime的值进行活性检查,一旦超时便销毁线程
4.时间单位
5.任务队列
6.线程工厂,用于创建线程使用,建议复写,返回特定线程名,方便出问题容易追溯
7.线程池丢弃策略,异常处理的方式,可以打印出日志或者记录日志到DB、缓存、消息等

       //核心线程数
        int corePoolSize = 2;
        //最大线程数
        int maximumPoolSize = 4;
        // 60s没有使用归还给OS
        long keepAliveTime = 60;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(97),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );

2 线程池工作原理图

下图给出了线程池的工作原理图,可以看出当线程数量小于corePoolSize(核心数量)数量的时候会创建新的线程;当任务队列满时候且线程数量没有达到maximumPoolSize(最大数量)时候也会创建新的线程。

这里写图片描述

3 ThreadPoolExecutor源码分析

线程池几个重要的成员变量

  • AtomicInteger类型的ctl代表了ThreadPoolExecutor中的控制状态,是一个原子整数,通过高低位包装了两个字段:1)workerCount:线程池中当前活动的线程数量,占据ctl的低29位;2)runState:线程池运行状态,占据ctl的高3位,其包含RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态,不难发现其值递增的。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //32-3
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    //-1 左移29 位 111 00000 00000000 00000000 00000000
    private static final int RUNNING    = -1 << COUNT_BITS;
     // 0 左移29位还是0
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
     // 1左移29位 001 00000 00000000 00000000 00000000
    private static final int STOP       =  1 << COUNT_BITS;
     // 2左移29位 010 00000 00000000 00000000 00000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 3左移29位 011 00000 00000000 00000000 00000000
    private static final int TERMINATED =  3 << COUNT_BITS;

    //工作队列,用于存放任务
    private final BlockingQueue<Runnable> workQueue;
    //工作线程组,存放工作线程
    private final HashSet<Worker> workers = new HashSet<Worker>();

构造函数(只是初始化赋值)

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        //判断线程数量是否小于核心线程数量
        if (workerCountOf(c) < corePoolSize) {
             //创建新的线程成功后直接返回
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
         //程序走到这里说明数量是小于核心线程数量或新建线程失败了
          //线程池是运行状态且任务加入队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程不是运行状态然后移除任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
             //线程数量为0(还有任务需要执行,这时候创建一个新的线程)
            else if (workerCountOf(recheck) == 0)
                //重新创建一个线程(空任务)放到线程池中
                addWorker(null, false);
        }
        //队列满了,尝试重新创建线程,注意addWorker第二参数为false了
        else if (!addWorker(command, false))
            //添加失败拒绝任务
            reject(command);
    }

addWorker 方法分析(创建一个新的线程并启动)

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
               //线程数量
                int wc = workerCountOf(c);
                //core是true 和corePoolSize比较,core是false 和               maximumPoolSize比较
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                   //增加线程个数成功,退出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建新建线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动线程,会执行worker 对象的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

可以看出线程启动后会执行Woker类中run方法


     Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //线程传的Runnable接口是this对象
            this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

runWorker 线程启动,执行的方法,从代码上可有看出,1)如果线程数量大于核心线程数量且keepAliveTime时间内任务队列没有任务的时候线程会被回收;2)如果allowCoreThreadTimeOut设置为true的时候,keepAliveTime时间内任务队列没有任务时候即使线程数量小于核心线程数量也会被回收。3)如果线程数量小于等于核心线程数量将会阻塞等待任务上,也就不会出现死循环的情况(allowCoreThreadTimeOut=false)。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //task为null 且getTask()==null时候退出循环
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //线程退出,从woker中移除
            processWorkerExit(w, completedAbruptly);
        }
    }

    
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut 默认false,也就是说当线程数量大于corePoolSize 时候 timed=true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
               // 如果timed 为true 就会按照keepAliveTime时间进行等待,如果等待不到直接返回null,也就是上面的runWorker 会退出,线程就会被释放,当如果timed 为
               false 会一直阻塞等待
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ;
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

JDK线程池源码分析 的相关文章

  • Datalore安装使用教程

    发现一个jetbrain出的好东西 使用体验完爆jupyter notebook以及jupyter lab的软件 就是安装有点复杂 官网写得有点不清楚 这里简单介绍一下 首先他只能在linux运行 其他环境暂时不支持 首先 去https w

随机推荐

  • react简要分析

    一 简介 前段时间看到一个用33行代码就实现了一个非常基本的react代码 感觉还是蛮有趣的 代码如下 其主要实现了两大功能 生成虚拟DOM 根据虚拟DOM渲染出真实的DOM 无注释版 https github com leontrolsk
  • linux下查看物理CPU个数、核数、逻辑CPU个数

    cat proc cpuinfo中的信息 processor 逻辑处理器的id physical id 物理封装的处理器的id core id 每个核心的id cpu cores 位于相同物理封装的处理器中的内核数量 siblings 位于
  • 消息队列中间件 - 详解RabbitMQ6种模式

    RabbitMQ 6种工作模式 对RabbitMQ 6种工作模式 简单模式 工作模式 订阅模式 路由模式 主题模式 RPC模式 进行场景和参数进行讲解 PHP代码作为实例 安装 客户端实现 添加扩展 执行composer phar inst
  • 《计算机网络—自顶向下方法》 Wireshark实验(七):以太网与ARP协议分析

    1 以太网 1 1 介绍 以太网是现实世界中最普遍的一种计算机网络 以太网有两类 第一类是经典以太网 第二类是交换式以太网 使用了一种称为交换机的设备连接不同的计算机 经典以太网 是以太网的原始形式 运行速度从 3 10 Mbps 不等 交
  • 51单片机实战 1 --四个独立按键控制四位数码管

    本文基于普中51开发板 在其例程代码稍加改动而成的 单片机的入门小项目也很益智 启动单片机 四位数码管显示0000 按下s1并松开 显示1000 再按下s1并松开显示2000 连续10次按下并松开s2 数码管显示2100 2200 2300
  • WSL安装图形界面

    效果如下 1 下载并安装VcXsrv 链接如下 https sourceforge net projects vcxsrv 下载完安装一路next即可 或者自行选择安装路径 2 安装桌面环境 安装xfce4 terminalsudo apt
  • mysql的锁

    锁 锁机制用于管理对共享资源的并发访问 用来实现事务的隔离级别 锁类型 共享锁和排他锁都是行级锁 MySQL当中事务采用的是粒度锁 针对表 B 树 页 B 树叶子 节点 行 B 树叶子节点当中某一段记录行 三种粒度加锁 共享锁 S 可理解为
  • Python进阶-----面向对象2.0(特有属性和方法与私有属性和方法)

    目录 前言 1 添加特有属性 方法 示例1 添加特有属性 示例2 添加特有方法 2 私有属性 方法 1 私有化示例 2 私有化属性 方法可以在类的内部使用 3 强制访问私有化属性 方法 4 property装饰器去操作私有属性 方法 总结
  • 【测试入门】测试用例经典设计方法 —— 因果图法

    01 因果图设计测试用例的步骤 1 分析需求 阅读需求文档 如果User Case很复杂 尽量将它分解成若干个简单的部分 这样做的好处是 不必在一次处理过程中考虑所有的原因 没有固定的流程说明究竟分解到何种程度才算简单 需要测试人员根据自己
  • 【LeetCode-面试经典150题-day24】

    目录 35 搜索插入位置 74 搜索二维矩阵 162 寻找峰值 33 搜索旋转排序数组 35 搜索插入位置 题意 给定一个排序数组和一个目标值 在数组中找到目标值 并返回其索引 如果目标值不存在于数组中 返回它将会被按顺序插入的位置 请必须
  • 详解UART、I2C、SPI常用通信协议(全是细节)

    前言 UART I2C和SPI是我们在嵌入式开发中比较常见的通信协议了 没有最好的通信协议 每个通信协议都有自己的优缺点 如果想要通信速度快 SPI 将是理想的选择 如果用户想要连接多个设备而不是过于复杂 I2C 将是理想的选择 因为它最多
  • Java fail-fast与fail-safe

    fail fast和fail safe比较 java util包下面的所有的集合类都是快速失败的 而java util concurrent包下面的所有的类都是安全失败的 快速失败的迭代器会抛出ConcurrentModificationE
  • mac如何添加新的字体格式(以word中仿宋_GB2312为例)

    注意 字体中必须出现GB 2312格式的选项 才算成功
  • C#创建TCP服务器与客户端互传消息实例

    本项目使用C 语言建立一个TCP通讯实例 并可以互相传递消息 传送一下传智播客赵老师的视频课程 关键词解释 1 TCP协议 一种可以用于网络通信的数据传输协议 传输安全可靠不会有信息丢失 重点理解三次握手与四次分手 2 线程Thread 我
  • SQL2008 附加数据库提示 5120错误

    前几天在附加数据库时 出现了这个错误 在win7 x64系统上使用sql2008进行附加数据库 包括在x86系统正在使用的数据库文件 直接拷贝附加在X64系统中 时 提示无法打开文件 5120错误 这个错误是因为没有操作权限 所以附加的时候
  • 2018-02-07 如何记录日志

    一 简介 二 记录日志的目的 why 开发调试 记录用户行为 程序运行状况 系统 机器状况 三 日志的要素 what 时间 位置 级别 内容 唯一标识 事件上下文 格式化 其他 四 记录日志的一些原则和技巧 使用框架或模块 不能出错 避免敏
  • npm run build打包产生的build文件夹通过nginx部署到服务器上访问(centos8)

    首先在当前目录下 用npm run build命令将文件打包到build文件夹 或者是其他文件夹名 把build目录传到服务器上 打开终端 提一句 Windows在Microsoft store里新出的terminal应用还蛮好用的 当然是
  • 30分钟学会React Hook, Memo, Lazy

    我们来学习React 16 8里的新特性 1 自行配置好React的环境 推荐你使用Create React APP 你也可以下载本文档Zip解压到本地直接运行 https github com yurizhang fed study bl
  • python之用scapy分层解析pcap报文(Ethernet帧、IP数据包、TCP数据包、UDP数据包、Raw数据包)

    一 工具准备 下载安装scapy库 https blog csdn net qq 23977687 article details 88046257 安装完后 ls 命令可以查看所有支持的协议 ls IP 命令列出ip协议头部字段格式 只要
  • JDK线程池源码分析

    0 概述 线程池 从字面的含义上来看 是指管理一组工作线程 Worker Thread 的资源池 线程池是与工作队列 Work Queue 密切相关的 其中在工作队列中保存了需要执行的任务 工作线程很简单 从任务队列中取出一个任务 执行任务