Java线程池源码解析及使用

2023-11-06

1、线程池的用处

Java 引入 Excutor 框架将任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池即可。

使用线程池的时机:

  • 单个任务处理时间比较短
  • 需要处理的任务数量很大

线程池的优点:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2、Executor接口

思想:将任务提交和任务执行解耦。无需关注如何创建线程,如何调度线程来执行任务,用户只需提供实现Runnable接口的任务对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。

线程池的运行主要分成两部分:任务管理、线程管理

任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:

  1. 直接申请线程执行该任务;
  2. 缓冲到队列中等待线程执行;
  3. 拒绝该任务。

线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

 3、线程池实现类 ThreadPoolExecutor 的构造方法参数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize :核心线程数目 (最多保留的线程数)
    • 当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize;如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
    • 核心线程数线程数定义了最小可以同时运行的线程数量。即便当前活动的线程有空闲的,只要这个活动的线程数量小于设定的核心线程数,那么依旧会启动一个新线程来执行任务,也就是说不会去复用任何线程。
  • maximumPoolSize:线程池中允许的最大线程数。
    • 如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize;
  • keepAliveTime :生存时间(针对救急线程) 
    • 当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime 才会被回收销毁
  • unit: keepAliveTime 参数的时间单位
  • workQueue: 用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
    • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按先进先出(FIFO)排序任务,支持公平锁和非公平锁;
    • LinkedBlockingQuene:基于链表结构的有界阻塞队列,按 FIFO 排序任务,默认的长度为 Integer.MAX_VALUE;
    • SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态;
    • PriorityBlockingQuene:支持线程优先级排序的无界阻塞队列,默认自然序进行排序,也可以自定义 compareTo() 方法来指定排序规则,不能保证同优先级元素的顺序;
  • threadFactory: 线程工厂
    • 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,方便出错时进行回溯
  • handler: 拒绝策略。当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
    • AbortPolicy:直接抛出异常,默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

4、线程池的属性标识

4.1、线程池核心属性

//原子整型。1.声明当前线程池的状态;2.声明线程池中的线程数
//高3位是线程池的状态,低29位是线程池中的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //线程池中线程的最大数量2^29-1

// 高3位是线程池的状态
//-1用二进制表示就是32个1,左移29位是高3位是1,低29位是0
 
//111,代表线程池为RUNNING,代表正常接收任务
private static final int RUNNING    = -1 << COUNT_BITS; 
//000,代表线程池为SHUTDOWN状态,不接受新任务,但是内部还会处理阻塞队列中的任务,正在进行的任务也正常处理
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//001 代表线程池为STOP状态,不接受新任务,也不去处理阻塞队列中的任务,同时会中断正在执行的任务
private static final int STOP       =  1 << COUNT_BITS;  
//010 代表线程池为TIDYING状态,过渡的状态,代表线程池即将关闭
private static final int TIDYING    =  2 << COUNT_BITS; 
//011 TERMINATED状态,代表线程池已经关闭
private static final int TERMINATED =  3 << COUNT_BITS;
//~为取反运算符,~CAPACITY代表高3位是1,低29位是0,得到线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获得线程池中工作线程的数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
//rs为高3位,表示线程池状态;wc是低29位,ctl是合并的结果
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl 这个 AtomicInteger 类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高 3 位保存 runState,低29位保存 workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。线程池源代码中,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

4.2、线程池状态

从前面每个状态对应的数值能得到:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,状态转换是由低到高的(-1 < 0 < 1 < 2 < 3)

在这里插入图片描述

5、任务执行机制

5.1 任务调度

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。这部分就是线程池的核心运行机制。

所有任务的调度都是由 execute 方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

执行流程:

  1. 线程池当前线程数如果小于核心线程数,则调用 addWorker(commond,true)方法创建新的线程执行任务,否则执行步骤2;
  2. 步骤1失败,说明工作线程数 > 核心线程数,那么考虑将任务放入阻塞队列,等待执行完任务的线程来处理。基于此,判断线程池是否处于Running状态(只有Running状态的线程池可以接受新任务),如果任务添加到任务队列成功则进入步骤3,失败则进入步骤4;
  3. 来到这一步需要说明任务已经加入任务队列,这时要二次校验线程池的状态,会有以下情形:
    • 线程池不再是Running状态了,需要将任务从任务队列中移除,如果移除成功则拒绝本次任务
    • 线程池是Running状态,判断线程池工作线程是否为0,是则调用 addWorker(commond,true)添加一个没有初始任务的线程(这个线程将去获取已经加入任务队列的本次任务并执行),否则进入步骤4;
    • 线程池不是Running状态,但从任务队列移除任务失败,结束操作
  4. 说明阻塞队列已满,将线程池扩容至maximumPoolSize并调用 addWorker(commond,false)方法创建新的线程执行任务,失败则拒绝本次任务。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    //获取当前工作线程数和线程池运行状态(共32位,前3位为运行状态,后29位为运行线程数)
    int c = ctl.get();
    // 1. 工作线程数 < 核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //创建核心线程执行任务,成功就返回
        if (addWorker(command, true))
            return;
        //创建核心线程数失败,重新获取ctl
        c = ctl.get();
    }

    // 2. 工作线程数 > 核心线程数

    // 如果当前线程池状态为RUNNING,并且任务成功添加到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重检查,因为从上次检查到进入此方法,线程池可能已成为SHUTDOWN状态
        // 如果当前线程池状态不是 RUNNING,则从队列删除任
        if (! isRunning(recheck) && remove(command))
            reject(command); //拒绝策略
        // 当线程池中的工作线程数为 0 时,此时阻塞队列中还有待执行的任务,核心线程数可能为 0
        else if (workerCountOf(recheck) == 0)
            // 新增一个线程消费阻塞队列中的任务
            addWorker(null, false);
    }

    // 3.阻塞队列已满

    // 尝试增加工作线程执行任务
    else if (!addWorker(command, false))
        reject(command); //拒绝策略
}

5.2、任务缓冲

  • 线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
  • 阻塞队列 (BlockingQueue) 是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

5.3 任务申请

任务的执行有两种可能:

  • 一种是任务直接由新创建的线程执行。
  • 一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。

线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由 getTask 方法实现。

getTask() 方法是为runWorker(Worker w)方法服务的,它的作用就是在任务队列(workQueue)中获取 task(Runnable)。

执行流程

  1. 将timedOut(上次获取任务是否超时)置为false(首次执行方法,无上次,自然为false),进入一个无限循环
  2. 如果线程池为Shutdown状态且任务队列为空(线程池shutdown状态可以处理任务队列中的任务,不再接受新任务,这个是重点)或者线程池为STOP或TERMINATED状态,则意味着线程池不必再获取任务了,当前工作线程数量-1并返回null,否则进入步骤3
  3. 如果线程池数量超限制或者时间超限且(任务队列为空或当前线程数>1),则进入步骤4,否则进入步骤5。
  4. 移除工作线程,成功则返回null,不成功则进入下轮循环。
  5. 尝试用poll() 或者 take()(具体用哪个取决于timed的值)获取任务,如果任务不为空,则返回该任务。如果为空,则将timeOut 置为 true进入下一轮循环。如果获取任务过程发生异常,则将 timeOut置为 false 后进入下一轮循环。
private Runnable getTask() {
    // 表示线程是否空闲时间超时了
    boolean timedOut = false;
    // 自旋
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
        // 1) 线程池关闭了,并且工作队列里的任务都完成了
        // 2) 线程池直接进入了 STOP 或更进一步的状态,就不返回新任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount(); // 工作线程数-1
            return null;
        }
 
        int wc = workerCountOf(c); //获取当前工作线程
 
        // 判断当前是否需要进行超时控制,核心线程是否超时(默认false)或当前是否存在非核心线程,
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
        // 如果当前线程数大于最大线程数 或 (允许超时控制 且 当前发生了空闲时间超时))
        // 且当前线程数 > 1 或 阻塞队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 工作线程数-1
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
 
        try {
            // 获取任务,从任务队列头部取出一个任务
            // 允许超时控制:poll ,不允许超时控制:take(会阻塞)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果获取不到任务,说明非核心线程超时了,下一轮判断确认是否退出循环。
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

5.4 任务拒绝

  • 任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池

6、Worker线程管理

6.1、Worker线程

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程 Worker。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    // 该worker正在运行的线程
    final Thread thread;
    
    // 将要运行的初始任务
    Runnable firstTask;
    
    // 每个线程的任务计数器
    volatile long completedTasks;
 
    // 构造方法   
    Worker(Runnable firstTask) {
        setState(-1); // 调用runWorker()前禁止中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); // 通过ThreadFactory创建一个线程
    }
 
    // 实现了Runnable接口的run方法
    public void run() {
        runWorker(this);
    }
    
    ... // 此处省略了其他方法
}

Worker 是 ThreadPoolExecutor类的内部类,实现了 Runnable 接口,并持有一个线程 thread,一个初始化的任务 firstTask 。thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,可以用来执行任务;firstTask 用它来保存传入的第一个任务::

  • 如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;
  • 如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

6.2、Worker线程增加

addWorker 方法:向线程池添加一个带有任务的工作线程。

addWorker 方法有两个参数:firstTask、core。

  • firstTask :新创建的线程应该首先运行的任务(如果没有,则为空)
  • core :该参数决定了线程池容量的约束条件,即当前线程数量以何值为极限值。参数为 true 则使用 corePollSize 作为约束值,否则使用 maximumPoolSize

执行流程:

1、外层循环判断线程池的状态是否可以新增工作线程。这层校验基于下面两个原则:

  • 线程池为Running状态时,既可以接受新任务也可以处理任务
  • 线程池为关闭状态时只能新增空任务的工作线程(worker)处理任务队列(workQueue)中的任务不能接受新任务

2、内层循环向线程池添加工作线程并返回是否添加成功的结果。

  • 首先校验线程数是否已经超限制,是则返回false,否则进入下一步
  • 通过CAS使工作线程数+1,成功则进入步骤3,失败则再次校验线程池是否是运行状态,是则继续内层循环,不是则返回外层循环

3、核心线程数量+1成功的后续操作:添加到工作线程集合,并启动工作线程

  • 首先获取锁之后,再次校验线程池状态(线程池为Running状态时或为ShutDown但为空任务的线程),通过则进入下一步,未通过则添加线程失败
  • 线程池状态校验通过后,再检查线程是否已经启动,是则抛出异常,否则尝试将线程加入线程池
  • 检查线程是否启动成功,成功则返回true,失败则进入 addWorkerFailed 方法
private boolean addWorker(Runnable firstTask, boolean core) {
    retry: // 循环退出标志位
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池状态为RUNNANLE 或 线程池状态为SHUTDOWN但阻塞队列不为空 跳过
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 换成更直观的条件语句
            // (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
            // 返回false的条件就可以分解为:
            //(1)线程池状态为STOP,TIDYING,TERMINATED
            //(2)线程池状态为SHUTDOWN,且要执行的任务不为空
            //(3)线程池状态为SHUTDOWN,且阻塞队列为空
            return false;

        // CAS 自旋增加线程个数
        for (;;) {
            int wc = workerCountOf(c); //获取工作线程个数
            // 工作线程数>=线程池容量 || 工作线程数>=(核心线程数||最大线程数)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false; // 创建工作线程失败
            
            // 执行cas操作,线程个数 + 1
            if (compareAndIncrementWorkerCount(c))
                break retry; // 添加成功,退出外层循环
            // 通过cas添加失败
            c = ctl.get();  
            // 判断线程池状态是否变化,,
            if (runStateOf(c) != rs)
                // 变化则跳到外层循环重试,重新获取线程池状态
                continue retry;
            // 否则内层循环重新cas
        }
    }

    // 简单总结上面的CAS过程:
    //(1)内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否则进行cas
    //(2)cas成功则退出双循环,cas失败了,要看当前线程池的状态是否变化了
    //(3)如果变了,则进入外层循环重新获取线程池状态,否则进入内层循环继续进行cas

    // 走到这里说明cas成功,线程数+1,但并未被执行
    boolean workerStarted = false; // 工作线程调用start()方法标志
    boolean workerAdded = false; // 工作线程被添加标志
    Worker w = null;
    try {
        w = new Worker(firstTask);  //创建工作线程
        final Thread t = w.thread; // 获取工作线程持有的线程实例
        if (t != null) {
            //获取线程池的全局锁,避免该工作线程w添加任务时,其他线程干掉了线程池,干掉线程池前需要先获取这个锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());  // 获取当前线程池状态
                // 线程池状态为RUNNING或者(线程池状态为SHUTDOWN并且没有新任务时)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // 检查线程是否处于活跃状态
                        throw new IllegalThreadStateException();
                    // 线程加入到存放工作线程的HashSet容器,workers全局唯一并被mainLock持有
                    workers.add(w);
                    int s = workers.size(); //获取工作线程个数
                    //如果工作线程数大于之前记录的最大工作线程数,就替换
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; //添加工作线程成功
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start(); //启动工作线程 runWorker(this);
                workerStarted = true; //启动工作线程成功
            }
        }
    } finally {
        if (! workerStarted)  // 如果线程启动失败,则执行addWorkerFailed方法
            addWorkerFailed(w);
    }
    //返回工作是否启动
    return workerStarted;
}

6.3、Worker线程执行任务

runWorker(Worker w) 是线程池中真正处理任务的方法,前面的 execute() 和 addWorker() 都是在为该方法做准备和铺垫。

执行流程:

  1. 判断当前任务或者从任务队列中获取的任务是否不为空,都为空则进入步骤2,否则进入步骤3
  2. 任务为空,则将completedAbruptly置为false(即线程不是突然终止),并执行processWorkerExit(w,completedAbruptly)方法进入线程退出程序
  3. 任务不为空,则进入循环,并加锁
  4. 判断是否为线程添加中断标识,以下两个条件满足其一则添加中断标识:
    1. 线程池状态>=STOP,即STOP或TERMINATED
    2. 一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,使线程池处于STOP或TERMINATED)
  5. 执行前置方法 beforeExecute(wt, task)(该方法为空方法,由子类实现)后执行task.run() 方法执行任务(执行不成功抛出相应异常)
  6. 执行后置方法 afterExecute(task, thrown)(该方法为空方法,由子类实现)后将线程池已完成的任务数+1,并释放锁。
  7. 再次进行循环条件判断。
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread(); // 获取工作线程中用来执行任务的线程实例
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // status设置为0,允许中断
    boolean completedAbruptly = true; // 线程意外终止标志
    try {
        //1. 任务不为空,执行任务
        //2. 任务为空,通过 getTask 方法从阻塞队列中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 加锁,保证下方临界区代码的线程安全
            // 如果线程池状态 >= STOP且当前线程还没有被中断,则主动中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt(); // 中断当前线程
            try {
                // 任务执行前的回调,空实现,可以在子类中自定义
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //开始执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后的回调,空实现,可以在子类中自定义
                    afterExecute(task, thrown);
                }
            } finally {
                task = null; // 将循环变量task设置为null,表示已处理完成
                w.completedTasks++; // 当前已完成的任务数+1
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //线程执行完毕的后续处理
        processWorkerExit(w, completedAbruptly);
    }
}

processWorkerExit:执行线程退出的方法

参数说明:

  1. Worker w:要结束的工作线程。
  2. boolean completedAbruptly: 是否突然完成(异常导致),如果工作线程因为用户异常死亡,则completedAbruptly参数为 true

执行流程:

  1. 如果 completedAbruptly 为 true,即工作线程因为异常突然死亡,则执行工作线程-1操作。
  2. 主线程获取锁后,线程池已经完成的任务数追加 w(当前工作线程) 完成的任务数,并从worker的set集合中移除当前worker。
  3. 根据线程池状态进行判断是否执行tryTerminate()结束线程池。
  4. 是否需要增加工作线程,如果线程池还没有完全终止,仍需要保持一定数量的线程。
    1. 如果当前线程是突然终止的,调用addWorker()创建工作线程
    2. 当前线程不是突然终止,但当前工作线程数量小于线程池需要维护的线程数量,则创建工作线程。需要维护的线程数量为corePoolSize(取决于成员变量 allowCoreThreadTimeOut是否为 false)或1。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 1.工作线程-1操作
     * 1)如果completedAbruptly 为true,说明工作线程发生异常,那么将正在工作的线程数量-1
     * 2)如果completedAbruptly 为false,说明工作线程无任务可以执行,由getTask()执行worker-1操作
     */
        if (completedAbruptly) 
            decrementWorkerCount();

        // 工作线程正常结束
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 将该worker已完成的任务数追加到线程池已完成的任务数
            completedTaskCount += w.completedTasks;
            // 从线程set集合中移除工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 根据线程池状态进行判断是否结束线程池
        tryTerminate();

        int c = ctl.get();
        // 如果线程池状态是running 或 shutdown
        if (runStateLessThan(c, STOP)) {
            // 如果线程是突然终止的,调用addWorker()创建工作线程
            // 如果当前线程不是突然终止,但当前工作线程数量小于线程池需要维护的线程数量,则创建工作线程
            if (!completedAbruptly) {
                // min为线程池需要维护的线程数量,
                // 允许核心线程超时销毁,min = 0,否则为核心线程数
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0且阻塞队列不为空
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 当前工作线程数>=线程池需要维护的线程数量,不需要创建新的线程
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

7、其他方式创建线程池 

方式一:通过 ThreadPoolExecutor 构造函数实现(推荐)

public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

方式二:通过 Executor 框架的工具类 Executors 来实现我们可以创建三种类型的 ThreadPoolExecutor :

  • FixedThreadPool
  • CachedThreadPool
  • SingleThreadExecutor

newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    // 使用的是默认工厂和拒绝策略。
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

特点:

  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间 
  • 阻塞队列是无界的,可以放任意数量的任务

适用于任务量已知,相对耗时的任务

使用的是默认工厂,可以看到创建的是非守护线程

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

我们可以自定义工厂,重写newThread方法

    ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
        private AtomicInteger t = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"mypool-" + t.getAndIncrement());
        }
    });

 输出

09:22:50.927 [mypool-1] DEBUG p8_1.testThreadPool.Test1 - 1
09:22:50.927 [mypool-2] DEBUG p8_1.testThreadPool.Test1 - 2
09:22:50.931 [mypool-1] DEBUG p8_1.testThreadPool.Test1 - 3

newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

 特点:

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
    • 全部都是救急线程(60s 后可以回收)
    • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的
    SynchronousQueue<Integer> integers = new SynchronousQueue<>();
    new Thread(() -> {
        try {
            log.debug("putting {} ", 1);
            integers.put(1);
            log.debug("{} putted...", 1);
            log.debug("putting {} ", 2);
            integers.put(2);
            log.debug("{} putted...", 2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"t1").start();

    TimeUnit.SECONDS.sleep(1);

    new Thread(() -> {
        try {
            log.debug("taking {}", integers.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"t2").start();

    TimeUnit.SECONDS.sleep(1);

    new Thread(() -> {
        try {
            log.debug("taking {}", integers.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"t3").start();
}

输出

09:32:53.238 [t1] DEBUG p8_1.testThreadPool.Test2 - putting 1 
09:32:54.237 [t1] DEBUG p8_1.testThreadPool.Test2 - 1 putted...
09:32:54.237 [t2] DEBUG p8_1.testThreadPool.Test2 - taking 1
09:32:54.238 [t1] DEBUG p8_1.testThreadPool.Test2 - putting 2 
09:32:55.244 [t1] DEBUG p8_1.testThreadPool.Test2 - 2 putted...
09:32:55.244 [t3] DEBUG p8_1.testThreadPool.Test2 - taking 2

整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况

newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

使用场景:

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
    • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改。
// 强转为ThreadPoolExecutor
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
// 改变核心线程数
threadPool.setCorePoolSize(2)

装饰者模式具体体现:内部调用了 ThreadPoolExecutor 的构造方法,传入的 corePoolSize 和maximumPoolSize 都为1,然后将该对象传给了 FinalizableDelegatedExecutorService,该类修饰了 ThreadPoolExecutor,让外部无法调用 ThreadPoolExecutor 内部的某些方法来修改所创建的线程池的大小。

public class Executors {

    static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

    static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }
}

测试:任务执行失败,线程池重新创建了一个新的线程

    ExecutorService pool = Executors.newSingleThreadExecutor();
    pool.execute(() -> {
        log.debug("{}",1 / 0);
    });
    pool.execute(() -> {
        log.debug("1");
    });
    pool.execute(() -> {
        log.debug("2");
    });
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
	at p8_1.testThreadPool.Test3.lambda$main$0(Test3.java:17)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
10:29:45.500 [pool-1-thread-2] DEBUG p8_1.testThreadPool.Test3 - 1
10:29:45.503 [pool-1-thread-2] DEBUG p8_1.testThreadPool.Test3 - 2

8、提交任务

    // 执行任务
    void execute(Runnable command);

    // 提交任务 task,用返回值 Future 获得任务执行结果
    <T> Future<T> submit(Callable<T> task);

    // 提交 tasks 中所有任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;

    // 提交 tasks 中所有任务,带超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
            throws InterruptedException;

    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;

    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;

测试submit

    ExecutorService pool = Executors.newFixedThreadPool(2);

    Future<String> future = pool.submit(() -> {
        log.debug("running");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "ok";
    });

    log.debug("{}",future.get());
10:33:20.269 [pool-1-thread-1] DEBUG p8_1.testThreadPool.Test4 - running
10:33:21.284 [main] DEBUG p8_1.testThreadPool.Test4 - ok

测试invokeAll

    ExecutorService pool = Executors.newFixedThreadPool(2);

    List<Future<String>> futures = pool.invokeAll(Arrays.asList(
            () -> {
                log.debug("beging");
                TimeUnit.SECONDS.sleep(1);
                return "1";
            },
            () -> {
                log.debug("beging");
                TimeUnit.SECONDS.sleep(2);
                return "2";
            },
            () -> {
                log.debug("beging");
                TimeUnit.SECONDS.sleep(3);
                return "3";
            }
    ));

    futures.forEach( future -> {
        try {
            log.debug("{}",future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    });
10:39:01.332 [pool-1-thread-2] DEBUG p8_1.testThreadPool.Test5 - beging
10:39:01.332 [pool-1-thread-1] DEBUG p8_1.testThreadPool.Test5 - beging
10:39:02.337 [pool-1-thread-1] DEBUG p8_1.testThreadPool.Test5 - beging
10:39:05.341 [main] DEBUG p8_1.testThreadPool.Test5 - 1
10:39:05.343 [main] DEBUG p8_1.testThreadPool.Test5 - 2
10:39:05.343 [main] DEBUG p8_1.testThreadPool.Test5 - 3

测试invokeAny

    ExecutorService pool = Executors.newFixedThreadPool(2);

    String result = pool.invokeAny(Arrays.asList(
            () -> {
                log.debug("beging 1");
                TimeUnit.SECONDS.sleep(2);
                log.debug("end 1");
                return "1";
            },
            () -> {
                log.debug("beging 2");
                TimeUnit.SECONDS.sleep(1);
                log.debug("end 2");
                return "2";
            },
            () -> {
                log.debug("beging 3");
                TimeUnit.SECONDS.sleep(3);
                log.debug("end 3");
                return "3";
            }
    ));

    log.debug("{}",result);
10:40:12.286 [pool-1-thread-1] DEBUG p8_1.testThreadPool.Test6 - beging 1
10:40:12.286 [pool-1-thread-2] DEBUG p8_1.testThreadPool.Test6 - beging 2
10:40:13.289 [pool-1-thread-2] DEBUG p8_1.testThreadPool.Test6 - end 2
10:40:13.289 [pool-1-thread-2] DEBUG p8_1.testThreadPool.Test6 - beging 3
10:40:13.289 [main] DEBUG p8_1.testThreadPool.Test6 - 2

8、关闭线程池

1)shutdown

线程池状态变为 SHUTDOWN 

  • 不会接收新任务
  • 但已提交任务会执行完
  • 此方法不会阻塞调用线程的执行 
void shutdown();

测试 

    ExecutorService pool = Executors.newFixedThreadPool(2);

    pool.submit(() -> {
        log.debug("task 1 running");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("task 1 finish");
    });

    pool.submit(() -> {
        log.debug("task 2 running");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("task 2 finish");
    });

    pool.submit(() -> {
        log.debug("task 3 running");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("task 3 finish");
    });

    log.debug("shutdown");
    pool.shutdown();

    pool.submit(() -> {
        log.debug("task 4 running");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("task 4 finish");
    });
10:58:11.923 [main] DEBUG c.Test - shutdown
10:58:11.923 [pool-1-thread-2] DEBUG c.Test - task 2 running
10:58:11.923 [pool-1-thread-1] DEBUG c.Test - task 1 running
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1376c05c rejected from java.util.concurrent.ThreadPoolExecutor@51521cc1[Shutting down, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at p8_1.testThreadPool.Test7.main(Test7.java:53)
10:58:12.938 [pool-1-thread-1] DEBUG c.Test - task 1 finish
10:58:12.938 [pool-1-thread-2] DEBUG c.Test - task 2 finish
10:58:12.938 [pool-1-thread-1] DEBUG c.Test - task 3 running
10:58:13.952 [pool-1-thread-1] DEBUG c.Test - task 3 finish

源码 

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改线程池状态
            advanceRunState(SHUTDOWN);
            // 仅会打断空闲线程
            interruptIdleWorkers();
            onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
        tryTerminate();
    }

2)shutdownNow

线程池状态变为 STOP 

  • 不会接收新任务
  • 会将队列中的任务返回
  • 并用 interrupt 的方式中断正在执行的任务 
List<Runnable> shutdownNow();

测试

    ExecutorService pool = Executors.newFixedThreadPool(2);

    pool.submit(() -> {
        log.debug("task 1 running");
        try {
            TimeUnit.SECONDS.sleep(1);
            log.debug("task 1 finish");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "1";
    });

    pool.submit(() -> {
        log.debug("task 2 running");
        try {
            TimeUnit.SECONDS.sleep(1);
            log.debug("task 2 finish");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "2";
    });

    pool.submit(() -> {
        log.debug("task 3 running");
        try {
            TimeUnit.SECONDS.sleep(1);
            log.debug("task 3 finish");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "3";
    });

    log.debug("shutdownNow");
    List<Runnable> runnables = pool.shutdownNow();
    log.debug("other....{}",runnables.size());
10:53:02.880 [main] DEBUG c.Test8 - shutdownNow
10:53:02.880 [pool-1-thread-1] DEBUG c.Test8 - task 1 running
10:53:02.880 [pool-1-thread-2] DEBUG c.Test8 - task 2 running
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at p8_1.testThreadPool.Test8.lambda$main$1(Test8.java:31)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at p8_1.testThreadPool.Test8.lambda$main$0(Test8.java:21)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
10:53:02.883 [main] DEBUG c.Test8 - other....1

源码

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改线程池状态
            advanceRunState(STOP);
            // 打断所有线程
            interruptWorkers();
            // 获取队列中剩余任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 尝试终结
        tryTerminate();
        return tasks;
    }

3)其它方法

// 不在 RUNNING 状态的线程池,此方法就返回 true 
boolean isShutdown();

// 线程池状态是否是 TERMINATED 
boolean isTerminated();

// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,
// 因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

测试awaitTermination

    ExecutorService pool = Executors.newFixedThreadPool(2);

    pool.submit(() -> {
        log.debug("task 1 running");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("task 1 finish");
    });

    pool.submit(() -> {
        log.debug("task 2 running");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("task 2 finish");
    });

    pool.submit(() -> {
        log.debug("task 3 running");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("task 3 finish");
    });

    log.debug("shutdown");
    pool.shutdown();
    pool.awaitTermination(3,TimeUnit.SECONDS);
    log.debug("main-Thread other thing");
11:00:32.136 [main] DEBUG c.Test - shutdown
11:00:32.136 [pool-1-thread-1] DEBUG c.Test - task 1 running
11:00:32.136 [pool-1-thread-2] DEBUG c.Test - task 2 running
11:00:33.140 [pool-1-thread-1] DEBUG c.Test - task 1 finish
11:00:33.140 [pool-1-thread-2] DEBUG c.Test - task 2 finish
11:00:33.140 [pool-1-thread-2] DEBUG c.Test - task 3 running
11:00:34.151 [pool-1-thread-2] DEBUG c.Test - task 3 finish
11:00:34.151 [main] DEBUG c.Test - main-Thread other thing

9、任务调度线程池

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务

    Timer timer = new Timer();
    TimerTask task1 = new TimerTask() {
        @Override
        public void run() {
            log.debug("1");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    TimerTask task2 = new TimerTask() {
        @Override
        public void run() {
            log.debug("2");
        }
    };
    log.debug("start");
    timer.schedule(task1,1000);
    timer.schedule(task2,1000);

注意观察时间 

11:07:15.238 [main] DEBUG c.Test9 - start
11:07:16.251 [Timer-0] DEBUG c.Test9 - 1
11:07:18.266 [Timer-0] DEBUG c.Test9 - 2

使用 ScheduledExecutorService 改写

    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

    pool.schedule(() -> {
        System.out.println("任务1,执行时间:" + new Date());
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, 1, TimeUnit.SECONDS);

    pool.schedule(() -> {
        System.out.println("任务2,执行时间:" + new Date());
    }, 2, TimeUnit.SECONDS);
任务1,执行时间:Mon Oct 17 11:09:10 CST 2022
任务2,执行时间:Mon Oct 17 11:09:11 CST 2022

scheduleAtFixedRate

   public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

任务执行时间超过了间隔时间

    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start...");
    pool.scheduleAtFixedRate(() -> {
        log.debug("running...");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s 

11:13:15.104 [main] DEBUG c.Test11 - start...
11:13:16.160 [pool-1-thread-1] DEBUG c.Test11 - running...
11:13:18.172 [pool-1-thread-1] DEBUG c.Test11 - running...
11:13:20.184 [pool-1-thread-1] DEBUG c.Test11 - running...

scheduleWithFixedDelay

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

 测试

    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start...");
    pool.scheduleWithFixedDelay(() -> {
        log.debug("running...");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },1,1,TimeUnit.SECONDS);

输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是上一个任务结束 <-> 延时 <-> 下一个任务开始,所以间隔都是 3s

11:14:44.395 [main] DEBUG c.Test11 - start...
11:14:45.455 [pool-1-thread-1] DEBUG c.Test11 - running...
11:14:48.474 [pool-1-thread-1] DEBUG c.Test11 - running...
11:14:51.486 [pool-1-thread-1] DEBUG c.Test11 - running...

整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务

实现每周四 18:00:00 定时执行任务

    // 获得当前时间
    LocalDateTime now = LocalDateTime.now();
    // 获取周四时间
    LocalDateTime thursday =
            now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);

    // 如果当前时间已经超过本周四 18:00:00.000,那么找下周四 18:00:00.000
    if(now.compareTo(thursday) >= 0){
        thursday = thursday.plusWeeks(1);
    }

    // 计算时间差,即延时执行时间
    long initailDelay = Duration.between(now, thursday).toMillis();

    // 计算间隔时间,即 1 周的毫秒值
    long period = 7 * 24 * 3600 * 1000;
    
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    pool.scheduleAtFixedRate(()->{
        System.out.println("running");
    }, initailDelay, period, TimeUnit.MILLISECONDS);

9、正确处理执行任务异常

方式一:主动捉异常

    ExecutorService pool = Executors.newFixedThreadPool(1);

    pool.execute(() -> {
        try {
            log.debug("task1");
            int i = 1 / 0;
        } catch (Exception e) {
            log.error("error:", e);
        }
    });
11:21:59.668 [pool-1-thread-1] DEBUG c.TestException - task1
11:21:59.673 [pool-1-thread-1] ERROR c.TestException - error:
java.lang.ArithmeticException: / by zero
	at p8_1.testThreadPool.TestException.lambda$main$0(TestException.java:22)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

方法2:使用 Future

    ExecutorService pool = Executors.newFixedThreadPool(1);

    Future<Boolean> f = pool.submit(() -> {
        log.debug("task1");
        int i = 1 / 0;
        return true;
    });
    log.debug("result:{}", f.get());
11:19:56.760 [pool-1-thread-1] DEBUG c.TestException - task1
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at p8_1.testThreadPool.TestException.main(TestException.java:33)
Caused by: java.lang.ArithmeticException: / by zero
	at p8_1.testThreadPool.TestException.lambda$main$0(TestException.java:30)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

10、线程池大小确定

  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿 
  • 过大会导致更多的线程上下文切换,占用更多内存

CPU 密集型运算

通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费。

I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下:

  • 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

例如 4 核 CPU 计算时间是 50%,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式

  • 4 * 100% * 100% / 50% = 8

参考文章:线程池源码解析_bullshitter的博客-CSDN博客_线程池源码解析

「超详细」Java线程池源码解析_倾听铃的声的博客-CSDN博客

(1条消息) 深入Java线程池:从设计思想到源码解读_云深i不知处的博客-CSDN博客

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java线程池源码解析及使用 的相关文章

  • Java JUC

    Java JUC 1 Java JUC简介 在 Java 5 0 提供了 java util concurrent xff08 简称 JUC xff09 包 xff0c 在此包中增加了在并发编程中很常用的实用工具类 xff0c 用于定义类似
  • 什么是juc

    juc是用于处理线程的工具包
  • 【Java面试题汇总】多线程、JUC、锁篇(2023版)

    导航 黑马Java笔记 踩坑汇总 Java基础 JavaWeb SSM SpringBoot 瑞吉外卖 SpringCloud 黑马旅游 谷粒商城 学成在线 设计模式 牛客面试题 目录 0 请你说说线程和进程的区别 1 请你说说多线程 2
  • 黑马并发编程JUC总结

    并发编程总结1 并发编程 2 进程和线程 2 1定义 2 2并发和并行 2 3应用 异步调用 并发应用 3 java线程 3 1线程创建 创建线程方法1 创建方法2 Thread和Runable的区别 创建方法3 3 2线程运行 3 3线程
  • JUC详细笔记

    此笔记根据黑马JUC课程整理 1 JUC概述 1 1什么是juc 在 Java 中 线程部分是一个重点 本篇文章说的 JUC 也是关于线程的 JUC就是 java util concurrent 工具包的简称 这是一个处理线程的工具包 JD
  • JUC快速学习笔记

    JUC快速学习笔记 狂神说JUC 个人学习笔记 介绍 JUC是指javaUtil包中的三个操作线程的包 并发操作 不加锁 方法 属性 方法 private int number 50 买票的方式 public void norSale if
  • 黑马并发编程JUC(信号量、线程安全类)总结

    黑马并发编程JUC总结 9 JUC Semaphore 定义 原理 acquire release CountDownLatch 为什么需要用到CountDownLatch 定义 为什么加载的时候需要使用到countDownLock 商品问
  • 【源码】走一遍源码弄清ArrayList容器的扩容机制

    源码 走一遍源码弄清ArrayList容器的扩容机制 首先我们来看看ArraysList容器在整个Java集合框架中所处的位置 由此可见ArrayList是Java集合框架中 两大派系中Collection接口的子接口List的实现类 我们
  • 详解ThreadLocal

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 1 ThreadLocal介绍 1 1 官方介绍 1 2 基本用法 1 2 1 常用方法 1 2 2 使用案例 1 3 ThreadLocal与synchroniz
  • 自定义线程池—学习原理,设计思想,独立实现

    自定义线程池 0 概念与模型 主线程不断地生产任务 直接交付线程执行 当前线程池数量 lt 核心数 否则 加入阻塞任务队列 等到线程池中空闲的线程获取执行 否则 阻塞队列已满 开发接口 拒绝策略 等待 超时等待队列不满时加入队列 放弃任务
  • JUC并发编程之AQS原理

    1 AQS 原理 1 1 概述 全称是 AbstractQueuedSynchronizer 是阻塞式锁和相关的同步器工具的框架 特点 用 state 属性来表示资源的状态 分独占模式和共享模式 子类需要定义如何维护这个生态 控制如何获取锁
  • JUC并发编程之Java线程(二)

    二 Java线程 2 1 创建和运行线程 方法一 Thread创建线程方式 继承Thread类 匿名内部类方式 public class CreateThread01 public static void main String args
  • 安全线程的集合

    1 CopyOnWriteArrayList package com kuang unsafe import java util import java util concurrent CopyOnWriteArrayList java u
  • Java JUC概述

    Java JUC Java Util Concurrent 是 Java 平台提供的并发编程工具包 它提供了一系列的工具类和接口 用于简化多线程编程 JUC 中的类和接口都是基于 Java 平台的底层并发原语 如锁 信号量 原子变量等 实现
  • JUC(2): 阻塞队列+线程池(重点)+新时代程序员必会

    一 阻塞队列 ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列 LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列 PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列 D
  • wait notify正确使用方式

    wait notify正确使用方式 假设 当线程 Thread03 在1 100中找出77并输出后 Thread01 输出所有1 100中的奇数 当线程 Thread03 在1 100中找出88并输出后 Thread02 输出所有1 100
  • 线程安全分析

    1 成员变量和静态变量是否线程安全 如果它们没有被共享 则线程安全 如果它们被共享了 根据它们的状态是否能够改变 又分两种情况 如果只有读操作 则线程安全 如果有读写操作 则这段代码是临界区 需要考虑线程安全 2 局部变量是否线程安全 局部
  • 【JUC并发编程】CopyOnWrite容器详解

    JUC并发编程 CopyOnWrite容器详解 文章目录 JUC并发编程 CopyOnWrite容器详解 一 什么是CopyOnWrite容器 二 CopyOnWriteArrayList 三 CopyOnWrite的业务中实现 一 什么是
  • 第5节 实现Callable 接口

    Java 5 0 在java util concurrent 提供了一个新的创建执行 线程的方式 Callable 接口 Callable 接口类似于Runnable 两者都是为那些其实例可能被另一个线程执行的类设计的 但是 Runnabl
  • JUC并发编程共享模型之管程(三)(中)

    4 5Monitor概念 Java 对象头 以 32 位虚拟机为例 在32位虚拟机中 1个机器码等于4字节 也就是32bit 在64位虚拟机中 1个机器码是8个字节 也就是64bit 普通对象 数组对象 其中Mark Word 结构为 最后

随机推荐

  • ***没有规则可以创建“XXX”需要的目标“XXX”问题的解决方案

    在第4季 上学期 专题2 U Boot新手入门中 1 在Linux中解压uboot tq2440 tar gz 2 tar xvzf uboot tq2440 tar gz 3 进入 uboot tq2440 4 make TQ2440 c
  • 软件加密系统Themida常见问题集锦—Themida是否支持命令行保护?

    Themida是先进的Windows软件保护系统 它被用于满足软件开发人员对于所开发应用程序安全保护的需求 使其远离被先进的逆向工程和软件破解的危险 通过下载Themida 我们集中在软件保护器所具有的主要弱点 从而提供了解决这些问题的完整
  • jdbc oracle多数据源,JdbcTemplate 配置多数据源

    有时候需要对接第三方厂商的数据库或者视图 我们不想让多数据源入侵我们现有的项目 那么可以试下JdbcTemplate 这里以Oracle视图为例 先确定下对方Oracle版本 然后引入对应版本的pom org springframework
  • SNKr:创造新的潮圈文化 将区块链与时尚潮流结合

    直播内容整理 关于SNKr SNKr以 Real Recognize Real 为核心愿景 是一个由区块链赋能的潮流文化生态社区项目 由SWELL公司发起 SNKr致力于连接潮流文化中的 真 玩家与 真 产品 通过loT和区块链技术帮助品牌
  • AcWing110. 防晒

    输入样例 3 2 3 10 2 5 1 5 6 2 4 1 输出样例 2 解析 按照右区间排序 优先满足小的 include
  • python读取图片的几种方式

    opencv的像素值在 0 1 0 1 show的时候转换到 0 255 import cv2 img cv2 imread imgfile cv2 imshow img win name img cv2 waitKey 0 无限期等待输入
  • 不要自称为程序员

    如果有我可以添加到每个工程教育的一门课程 它不涉及编译器或门或时间复杂度 这将是您工业101的现实 因为我们不教他们和许多不必要的痛苦和折磨这个结果 这后立志要为你作为一个年轻的工程师的职业生涯中的自我介绍 填写在您的教育差距 就如何在 现
  • TMPGEnc 4.0 XPress(小日本4)优化安装教程

    小日本4 TMPGEnc 4 0 XPress 是小日本2 54的升级版本 与小日本2 54之间本来还有一个3 0 版本 不过3 0 没有产生太大影响即升级到4 0 版本 尽管是小日本2 54的升级版本 但3 0 以后此软件便属于全新开发
  • VirtualBox安装OpenWRT虚拟机,及Kernel panic - not syncing: Attempted to kill init故障排除

    编译或下载镜像文件 openwrt x86 generic Generic combined ext4 img gz 解压 gunzip d openwrt x86 generic Generic combined ext4 img gz
  • c++构建正态分布的随机数

    最近编程的时候遇到一个问题 需要用c 来产生一个满足正态分布的的随机数 用c 产生一个均匀分布的随机数很容易 但是满足正态分布还是有点懵逼的 然后就在网上搜一些资料 发现有三种方法可以产生正态分布的随机数 但是看别人从理论上的推导 感觉还是
  • node——使用Nginx + Node.js部署你的网站

    Nginx是一个高性能的HTTP和反向代理服务器 反向代理就是通常所说的web服务器加速 它是一种通过在繁忙的web服务器和internet之间增加一个高速的web缓冲服务器来降低实际的web服务器的负载 Nginx由俄罗斯程序员利用C语言
  • python的动态加载的一个注意地方

    先描述一下我的问题背景 然后给出错误发现 最终给出解决办法 1 我有很多python文件 并且这些文件内容会按照一定周期被更新但是文件名字不变 并且每个文件内都有一个一样的class的名字 需要我去动态调用 我的调用方法是使用的python
  • Spring-03 Aop简介,实现原理,基于ProxyFactoryBean实现Aop,基于AspectJ开发的实现

    Spring 03 1 SpringAop简介 AOP的全称是Aspect Oriented Programming 即面向切面编程 也称面向方面编程 它是面向对象编程 OOP 的一种补充 目前已成为一种比较成熟的编程方式 aop 解决的问
  • C语言——白盒测试

    深入理解白盒测试的基本方法 运用基本路径测试法设计测试用例 1 掌握白盒测试技术中基本路径测试法的基本步骤 2 训练针对具体程序运用基本路径测试法设计测试用例的能力 测试代码 DEVcpp 源代码 点击此处可下载 include
  • Android 自动化触发GC

    问题 最近有个小需求 能通过自动化对app进行GC回收 对于app的处理无外乎主动调用System gc 或者使用adb命令直接进行GC回收 解决方法 方法一 在代码里的某个方法调用System gc 如我申明一个receiver 然后通过
  • linux:SecureCRT SSH连接报错 Key exchange failed. No compatible key exchange method

    问题 配置ssh后提示 Key exchange failed No compatible key exchange method The server supports these methods curve25519 sha256 cu
  • 数据分析36计(九):倾向得分匹配法(PSM)量化评估效果分析

    1 因果推断介绍 如今量化策略实施的效果评估变得越来越重要 数据驱动产品和运营 业务等各方的理念越来越受到重视 如今这方面流行的方法除了实验方法AB testing外 就是因果推断中的各种观察研究方法 统计相关性并不意味着因果关系 数据分析
  • 高性能Mysql——一条SQL语句在Mysql中是如何执行的?

    文章目录 MySQL 基本架构概览 Server层介绍 SQL执行过程 查询语句 更新语句 SQL执行过程的日志问题 本篇文章会分析下一个 sql 语句在 MySQL 中的执行流程 包括 sql 的查询在 MySQL 内部会怎么流转 sql
  • Topaz Video AI for Mac 3.3.3

    Topaz Video AI是一款使用人工智能技术对视频进行增强和修复的软件 它可以自动降噪 去除锐化 减少压缩失真 提高清晰度等等 Topaz Video AI可以处理各种类型的视频 包括低分辨率视频 老旧影片 手机录制的视频等等 使用T
  • Java线程池源码解析及使用

    1 线程池的用处 Java 引入 Excutor 框架将任务的提交和执行进行解耦 只需要定义好任务 然后提交给线程池即可 使用线程池的时机 单个任务处理时间比较短 需要处理的任务数量很大 线程池的优点 降低资源消耗 通过重复利用已创建的线程