ThreadPoolExecutor任务提交与停止流程及底层实现

2023-05-16

ThreadPoolExecutor任务提交

executor任务提交流程

通过查看源码可知,JUC下的Excutor接口仅提供了一个可执行方法executor

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

JDK注释中的介绍为“Executes the given command at some time in the future.”,在在未来某个时刻执行给定的命令command,即向线程池中提交任务,在未来某个时刻执行,提交的任务必须要实现Runnable接口,而且void类型导致该提交方式不能获取返回值。

executor的JDK源码如下,给出的介绍:

在未来的某个时刻执行给定的任务。 这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行是这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行。

 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理。

  public void execute(Runnable command) {
         if(command == null) throw new NullPointerException;
         int c = ctl.get();

        /**
         * 1、如果当前线程数少于corePoolSize
         *(可能是由于addWorker()操作已经包含对线程池状态的判断,如此处没加,而入workQueue前加了)
         */
        if (workerCountOf(c) < corePoolSize) {
            //addWorker()成功,返回
            if (addWorker(command, true))
                return;

            /**
             * 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get())
             * 失败的原因可能是:
             * 1、线程池已经shutdown,shutdown的线程池不再接收新任务
             * 2、workerCountOf(c) < corePoolSize 判断后,由于并发,
             * 别的线程先创建了worker线程,导致workerCount>=corePoolSize
             */
            c = ctl.get();
        }

        /**
         * 2、如果线程池RUNNING状态,且入队列成功
         */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();//再次校验位

            /**
             * 再次校验放入workerQueue中的任务是否能被执行
             * 1、如果线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务
             * 2、如果线程池是运行状态,或者从workQueue中删除任务失败
             *(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了)
             */
            //如果再次校验过程中,线程池不是RUNNING状态,
            //并且remove(command)--workQueue.remove()成功,拒绝当前command
            if (! isRunning(recheck) && remove(command))
                reject(command);
                //如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null
                //为什么只检查运行的worker数量是不是0呢?? 为什么不和corePoolSize比较呢??
                //只保证有一个worker线程可以从queue中获取任务执行就行了??
                //因为只要还有活动的worker线程,就可以消费workerQueue中的任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
            //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
            //第二个参数为true代表占用corePoolSize,false占用maxPoolSize
        }
        /**
         * 3、如果线程池不是running状态 或者 无法入队列
         * 尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,
         * 拒绝当前command
         */
        else if (!addWorker(command, false))
            reject(command);
    }    
}
  1. 首先获取的命令需要非空,若为null报空指针错误
  2. 若运行线程的数量小于corePoolSize,就通过addWorker创建一个新的线程来执行新添加的任务去运行command,command会作为这个线程的第一个任务,并且对addWorker的调用会原子化地检查runState和workerCount。
  3. 如果任务成功的放入队列,仍需要一个双重校验机制去确认是否应该新建一个线程,因为现有的线程在上次检查后死亡或者线程池在进入这个方法后关闭了,所以需要再次检查state,有必要需要回滚队列,如果线程池中没有线程了,就开启一个线程。
  4. 如果无法将任务入队列(可能队满),就尝试添加一个新线程,如果失败了,就说明线程池关闭了或者饱和了因此拒绝任务执行。

线程池执行过程

  1. 如果线程池中的线程数量少于corePoolSize,就创建新的线程来执行新添加的任务
  2. 如果线程池中的线程数量大于等于corePoolSize,但队列workQueue未满,则将新添加的任务放到workQueue中
  3. 如果线程池中的线程数量大于等于corePoolSize,且队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务
  4. 如果线程池中的线程数量等于了maximumPoolSize,就用RejectedExecutionHandler来执行拒绝策略

线程池状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

其中ctl这个AtomicInteger的功能很强大,Integer4byte,32bit,其中高3位用于维护线程池运行状态,低29位维护线程池中线程数量

1、RUNNING:-1<<COUNT_BITS,即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务

2、SHUTDOWN:0<<COUNT_BITS,即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务

3、STOP:1<<COUNT_BITS,即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务

4、TIDYING:2<<COUNT_BITS,即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法

5、TERMINATED:3<<COUNT_BITS,即高3位为100,低29位为0,terminated()方法调用完成后变成此状态

这些状态均由int型表示,大小关系为 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这个顺序基本上也是遵循线程池从 运行 到 终止这个过程。

runStateOf(int c) 方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态

workerCountOf(int c)方法:c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量

ctlOf(int rs, int wc)方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl(原子操作类)

  addWorker添加worker线程

/**
 * 检查根据当前线程池的状态和给定的边界(core or maximum)是否可以创建一个新的worker
 * 如果是这样的话,worker的数量做相应的调整,如果可能的话,创建一个新的worker并启动,
 * 参数中的firstTask作为worker的第一个任务
 * 如果方法返回false,可能因为pool已经关闭或者调用过了shutdown
 * 如果线程工厂创建线程失败,也会失败,返回false
 * 如果线程创建失败,要么是因为线程工厂返回null,要么是发生了OutOfMemoryError
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    //外层循环,负责判断线程池状态
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c); //状态
        /**
         * 线程池的state越小越是运行状态,runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
         * 1、如果线程池state已经至少是shutdown状态了
         * 2、并且以下3个条件任意一个是false
         *   rs == SHUTDOWN         
         *   (隐含:rs>=SHUTDOWN)false情况: 线程池状态已经超过shutdown,
         *   可能是stop、tidying、terminated其中一个,即线程池已经终止
         *   
         *   firstTask == null      
         *  (隐含:rs==SHUTDOWN)false情况: 
         *    firstTask不为空,rs==SHUTDOWN 且 firstTask不为空,return false,
         *    场景是在线程池已经shutdown后,还要添加新的任务,拒绝
         *
         *   ! workQueue.isEmpty()  
         * (隐含:rs==SHUTDOWN,firstTask==null)false情况: workQueue为空,
         *  当firstTask为空时是为了创建一个没有任务的线程,再从workQueue中获取任务,
         *  如果workQueue已经为空,那么就没有添加新worker线程的必要了
         *
         * return false,即无法addWorker()
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        
        if(rs != SHUTDOWN) //大于等于shutdown的状态,返回false
        if(firstTask != null)  是新提交的任务,在shutdown状态及之后,都不会执行,返回false
        if(workQueue.isEmpty()) 如果队列为空,就没有必要新增加线程worker   
 
        //内层循环,负责worker数量+1
        for (;;) {
            int wc = workerCountOf(c); //worker数量
             
            //如果worker数量>线程池最大上限CAPACITY(即使用int低29位可以容纳的最大值)
            //或者( worker数量>corePoolSize 或  worker数量>maximumPoolSize ),即已经超过了给定的边界
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
             
            //调用unsafe CAS操作,使得worker数量+1,成功则跳出retry循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
             
            //CAS worker数量+1失败,再次读取ctl
            c = ctl.get();  // Re-read ctl
             
            //如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
            // else CAS失败时因为workerCount改变了,继续内层循环尝试CAS对worker数量+1
        }
    }
 
    /**
     * worker数量+1成功的后续操作
     * 添加到workers Set集合,并启动worker线程
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock; 
        w = new Worker(firstTask); //1、设置worker这个AQS锁的同步状态state=-1
                                   //2、将firstTask设置给worker的成员变量firstTask
                                   //3、使用worker自身这个runnable,调用ThreadFactory创建一个线程,并设置给worker的成员变量thread
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                //--------------------------------------------这部分代码是上锁的
                // 当获取到锁后,再次检查
                int c = ctl.get();
                int rs = runStateOf(c);
 
                //如果线程池在运行running<shutdown 或者 线程池已经shutdown,
                //且firstTask==null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
                //worker数量-1的操作在addWorkerFailed()
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable   线程已经启动,抛非法线程状态异常
                        throw new IllegalThreadStateException();
                     
                    workers.add(w);//workers是一个HashSet<Worker>
                     
                    //设置最大的池大小largestPoolSize,workerAdded设置为true
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
              //--------------------------------------------
            } 
            finally {
                mainLock.unlock();
            }
             
            //如果往HashSet中添加worker成功,启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果启动线程失败
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker(Runnable firstTask, boolean core)

参数:

firstTask: worker线程的初始任务,可以为空

core: true:将corePoolSize作为上限,false:将maximumPoolSize作为上限

addWorker方法有4种传参的方式:

1、addWorker(command, true)

2、addWorker(command, false)

3、addWorker(null, false)

4、addWorker(null, true)

在execute方法中就使用了前3种,结合这个核心方法进行以下分析

第一种:线程数小于corePoolSize时,放一个需要处理的task进Workers Set,如果Workers Set长度超过corePoolSize,就返回false

第二种:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false

第三种:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务

第四种:这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads\(\)方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行

执行流程:

1、判断线程池当前是否为可以添加worker线程的状态,可以则继续下一步,不可以return false:

A、线程池状态 > shutdown,可能为stop、tidying、terminated,不能添加worker线程

B、线程池状态==shutdown,firstTask不为空,不能添加worker线程,因为shutdown状态的线程池不接收新任务

C、线程池状态==shutdown,firstTask==null,workQueue为空,不能添加worker线程,因为firstTask为空是为了添加一个没有任务的线程再从workQueue获取task,而workQueue为空,说明添加无任务线程已经没有意义

2、线程池当前线程数量是否超过上限(corePoolSize 或 maximumPoolSize),超过了return false,没超过则对workerCount+1,继续下一步

3、在线程池的ReentrantLock保证下,向Workers Set中添加新创建的worker实例,添加完成后解锁,并启动worker线程,如果这一切都成功了,return true,如果添加worker入Set失败或启动失败,调用addWorkerFailed()逻辑

 

内部类Worker

/**
 * Worker类大体上管理着运行线程的中断状态 和 一些指标
 * Worker类投机取巧的继承了AbstractQueuedSynchronizer来简化在执行任务时的获取、释放锁
 * 这样防止了中断在运行中的任务,只会唤醒(中断)在等待从workQueue中获取任务的线程
 * 解释:
 *   为什么不直接执行execute(command)提交的command,而要在外面包一层Worker呢??
 *   主要是为了控制中断
 *   用什么控制??
 *   用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁
 *   只有在等待从workQueue中获取任务getTask()时才能中断
 *
 * worker实现了一个简单的不可重入的互斥锁,而不是用ReentrantLock可重入锁
 * 因为我们不想让在调用比如setCorePoolSize()这种线程池控制方法时可以再次获取锁(重入)
 * 解释:
 *   setCorePoolSize()时可能会interruptIdleWorkers(),在对一个线程interrupt时会要w.tryLock()
 *   如果可重入,就可能会在对线程池操作的方法中中断线程,类似方法还有:
 *   setMaximumPoolSize()
 *   setKeppAliveTime()
 *   allowCoreThreadTimeOut()
 *   shutdown()
 * 此外,为了让线程真正开始后才可以中断,初始化lock状态为负值(-1),在开始runWorker()时将state置为0,而state>=0才可以中断
 * 
 * Worker继承了AQS,实现了Runnable,说明其既是一个可运行的任务,也是一把锁(不可重入)
 */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread; //利用ThreadFactory和 Worker这个Runnable创建的线程对象
     
    Runnable firstTask;
     
    volatile long completedTasks;
 
    Worker(Runnable firstTask) {
        //设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
        setState(-1);
        // 在调用runWorker()前,禁止interrupt中断,
        //在interruptIfStarted()方法中会判断 getState()>=0
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); 
        //根据当前worker创建一个线程对象
       //当前worker本身就是一个runnable任务,也就是不会用参数的firstTask创建线程,
       //而是调用当前worker.run()时调用firstTask.run()
    }
 
    public void run() {
        runWorker(this); 
        //runWorker()是ThreadPoolExecutor的方法
    }
 
    // Lock methods
    // The value 0 represents the unlocked state. 0代表“没被锁定”状态
    // The value 1 represents the locked state. 1代表“锁定”状态
 
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
 
    /**
     * 尝试获取锁
     * 重写AQS的tryAcquire(),AQS本来就是让子类来实现的
     */
    protected boolean tryAcquire(int unused) {
        //尝试一次将state从0设置为1,即“锁定”状态,
        //但由于每次都是state 0->1,而不是+1,那么说明不可重入
        //且state==-1时也不会获取到锁
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread()); 
            //设置exclusiveOwnerThread=当前线程
            return true;
        }
        return false;
    }
 
    /**
     * 尝试释放锁
     * 不是state-1,而是置为0
     */
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null); 
        setState(0);
        return true;
    }
 
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
 
    /**
     * 中断(如果运行)
     * shutdownNow时会循环对worker线程执行
     * 且不需要获取worker锁,即使在worker运行时也可以中断
     */
    void interruptIfStarted() {
        Thread t;
        //如果state>=0、t!=null、且t没有被中断
        //new Worker()时state==-1,说明不能中断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker类本身既实现了Runnable,又继承了AbstractQueuedSynchronizer,所以其既是一个可执行的任务,又可以达到锁的效果

new Worker()

1、将AQS的state置为-1,在runWoker()前不允许中断

2、待执行的任务会以参数传入,并赋予firstTask

3、用Worker这个Runnable创建Thread

之所以Worker自己实现Runnable,并创建Thread,在firstTask外包一层,是因为要通过Worker控制中断,而firstTask这个工作任务只是负责执行业务

Worker控制中断主要有以下几方面:

1)、初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断

不允许中断体现在:

  1. shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state=-1时tryLock()失败,没办法中断interrupt()
  2. shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>=0才能interrupt的逻辑,而初始state=-1

interruptIfStarted()源码如下,需要getState()>=0才能进行中断 

//中断已经执行runworker的工作线程
void interruptIfStarted() {
    Thread t;
    //w.unlock:state=0;w.lock:state=1,在执行到runWorker,w.unlock时就可以中断了
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

2)为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程。Worker实现的AQS为不可重入锁,为了是在获得worker锁的情况下再进入其它一些需要加锁的方法

runWorker():执行任务

  /**
     * 重复的从队列中获取任务并执行,同时应对一些问题:
     * <p>
     * 1、我们可能使用一个初始化任务开始,即firstTask为null
     * 然后只要线程池在运行,我们就从getTask()获取任务
     * 如果getTask()返回null,则worker由于改变了线程池状态或参数配置而退出
     * 其它退出因为外部代码抛异常了,这会使得completedAbruptly为true,这会导致在processWorkerExit()方法中替换当前线程
     * <p>
     * 2在任何任务执行之前,都需要对worker加锁去防止在任务运行时,其它的线程池中断操作
     * clearInterruptsForTaskRun保证除非线程池正在stoping,线程不会被设置中断标示
     * <p>
     * 3.每个任务执行前会调用beforeExecute(),其中可能抛出一个异常,这种情况下会导致线程die(跳出循环,且completedAbruptly==true),没有执行任务
     * 因为beforeExecute()的异常没有cache住,会上抛,跳出循环
     * <p>
     * 4. 假定beforeExecute()正常完成,我们执行任务
     * 汇总任何抛出的异常并发送给afterExecute(task, thrown)
     * 因为我们不能在Runnable.run()方法中重新上抛Throwables,我们将Throwables包装到Errors上抛(会到线程的UncaughtExceptionHandler去处理)
     * 任何上抛的异常都会导致线程die
     * <p>
     * 5. 任务执行结束后,调用afterExecute(),也可能抛异常,也会导致线程die
     * 根据JLS Sec 14.20,这个异常(finally中的异常)会生效
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        // new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,
        // 将state置为0, 而interruptIfStarted()中只有state>=0才允许调用中断
        boolean completedAbruptly = true; 
        //是否“突然完成”,如果是由于异常导致的进入finally,那么completedAbruptly==true就是突然完成的
        try {
        /**
         * 如果task不为null,或者从阻塞队列中getTask()不为null
         */
            while (task != null || (task = getTask()) != null) {
                w.lock(); 
                //上锁,不是为了防止并发执行任务,为了在shutdown()时不终止正在运行的worker
                
                /**
                * clearInterruptsForTaskRun操作
                * 确保只有在线程stoping时,才会被设置中断标示,否则清除中断标示
                * 1、如果线程池状态>=stop,且当前线程没有设置中断状态,wt.interrupt()
                * 2、如果一开始判断线程池状态<stop,但Thread.interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程池状态是否>=stop
                * 是,再次设置中断标示,wt.interrupt()
                * 否,不做操作,清除中断标示后进行后续步骤
                */
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt(); //当前线程调用interrupt()中断

                try {
                    //执行前(子类实现)
                    beforeExecute(wt, task);

                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        //执行后(子类实现)
                        afterExecute(task, thrown); 
                        //这里就考验catch和finally的执行顺序了,因为要以thrown为参数
                    }
                } finally {
                    task = null; //task置为null
                    w.completedTasks++; //完成任务数+1
                    w.unlock(); //解锁
                }
            }

            completedAbruptly = false;
        } finally {
            //处理worker的退出
            processWorkerExit(w, completedAbruptly);
        }
    }
}

runWorker(Worker w)

执行流程:

1、Worker线程启动后,通过Worker类的run()方法调用runWorker(this)

2、执行任务之前,首先worker.unlock(),将AQS的state置为0,允许中断当前worker线程

3、开始执行firstTask,调用task.run(),在执行任务前会上锁wroker.lock(),在执行完任务后会解锁,为了防止在任务运行时被线程池一些中断操作中断

4、在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法

5、无论在beforeExecute()、task.run()、afterExecute()发生异常上抛,都会导致worker线程终止,进入processWorkerExit()处理worker退出的流程

6、如正常执行完当前task后,会通过getTask()从阻塞队列中获取新任务,当队列中没有任务,且获取任务超时,那么当前worker也会进入退出流程

getTask():获取任务

  /**
     *以下情况会返回null
     * 1. 超过了maximumPoolSize设置的线程数量(因为调用了setMaximumPoolSize())
     * 2. 线程池被stop
     * 3. 线程池被shutdown,并且workQueue空了
     * 4. 线程等待任务超时
     *
     * @return 返回null表示这个worker要结束了,这种情况下workerCount-1
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        /**
         * 外层循环
         * 用于判断线程池状态
         */
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 对线程池状态的判断,两种情况会workerCount-1,并且返回null
             * 线程池状态为shutdown,且workQueue为空(反映了shutdown状态的线程池还是要执行workQueue中剩余的任务的)
             * 线程池状态为stop(shutdownNow()会导致变成STOP)(此时不用考虑workQueue的情况)
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount(); //循环的CAS减少worker数量,直到成功
                return null;
            }

            boolean timed;      // Are workers subject to culling?
            // 是否需要定时从workQueue中获取

            /**
             * 内层循环
             * 要么break去workQueue获取任务
             * 要么超时了,worker count-1
             */
            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize; //allowCoreThreadTimeOut默认为false
                //如果allowCoreThreadTimeOut为true,说明corePoolSize和maximum都需要定时

                //如果当前执行线程数<maximumPoolSize,并且timedOut 和 timed 任一为false,
                // 跳出循环,开始从workQueue获取任务
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;

                /**
                 * 如果到了这一步,说明要么线程数量超过了maximumPoolSize(可能maximumPoolSize被修改了)
                 * 要么既需要计时timed==true,也超时了timedOut==true
                 * worker数量-1,减一执行一次就行了,然后返回null,在runWorker()中会有逻辑减少worker线程
                 * 如果本次减一失败,继续内层循环再次尝试减一
                 */
                if (compareAndDecrementWorkerCount(c))
                    return null;

                //如果减数量失败,再次读取ctl
                c = ctl.get();  // Re-read ctl

                //如果线程池运行状态发生变化,继续外层循环
                //如果状态没变,继续内层循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                //poll() - 使用  LockSupport.parkNanos(this, nanosTimeout) 
                // 挂起一段时间,interrupt()时不会抛异常,但会有中断响应
                //take() - 使用 LockSupport.park(this) 挂起,interrupt()时不会抛异常,但会有中断响应
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    
                        //大于corePoolSize
                        workQueue.take();                                        
                        //小于等于corePoolSize

                //如获取到了任务就返回
                if (r != null)
                    return r;

                //没有返回,说明超时,那么在下一次内层循环时会进入worker count减一的步骤
                timedOut = true;
            }
            /**
             * blockingQueue的take()阻塞使用LockSupport.park(this)进入wait状态的,
             * 对LockSupport.park(this)进行interrupt不会抛异常,但还是会有中断响应
             * 但AQS的ConditionObject的await()对中断状态做了判断,会报告中断状态 
             * reportInterruptAfterWait(interruptMode)
             * 就会上抛InterruptedException,在此处捕获,重新开始循环
             * 如果是由于shutdown()等操作导致的空闲worker中断响应,在外层循环判断状态时,可能return null
             */
            catch (InterruptedException retry) {
                timedOut = false; //响应中断,重新开始,中断状态会被清除
            }
        }
    }

执行流程:

1、首先判断是否可以满足从workQueue中获取任务的条件,不满足return null

    A、线程池状态是否满足:

        (a)shutdown状态 + workQueue为空或stop状态,都不满足,因为被shutdown后还是要执行workQueue剩余的任务,但workQueue也为空,就可以退出了

        (b)stop状态,shutdownNow()操作会使线程池进入stop,此时不接受新任务,中断正在执行的任务,workQueue中的任务也不执行了,故return null返回

    B、线程数量是否超过maximumPoolSize或获取任务是否超时

        (a)线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize()被改变了大小,否则已经addWorker()成功不会超过maximumPoolSize

        (b)如果 当前线程数量>corePoolSize,才会检查是否获取任务超时,这也体现了当线程数量达到maximumPoolSize后,如果一直没有新任务,会逐渐终止worker线程直到corePoolSize

2、如果满足获取任务条件,根据是否需要定时获取调用不同方法:

    A、workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null

    B、workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务

3、在阻塞从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程

Worker和Task的区别:

Worker是线程池中的线程,而Task虽然是runnable,但是并没有真正执行,只是被Worker调用了run方法,后面会看到这部分的实现。

processWorkerExit():worker线程退出

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 1、worker数量-1
     * 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,
     *那么正在工作的worker线程数量需要-1
     * 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
     */
    if (completedAbruptly) 
        // If abrupt, then workerCount wasn't adjusted 代码和注释正好相反啊
        decrementWorkerCount();
 
    /**
     * 2、从Workers Set中移除worker
     */
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks; //把worker的完成任务数加到线程池的完成任务数
        workers.remove(w); //从HashSet<Worker>中移除
    } finally {
        mainLock.unlock();
    }
 
    /**
     * 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池
     * 主要是判断线程池是否满足终止的状态
     * 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
     * 没有线程了,更新状态为tidying->terminated
     */
    tryTerminate();
 
    /**
     * 4、是否需要增加worker线程
     * 线程池状态是running 或 shutdown
     * 如果当前线程是突然终止的,addWorker()
     * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
     * 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
     */
    int c = ctl.get();
    //如果状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个worker
    if (runStateLessThan(c, STOP)) {
        //不是突然完成的,即没有task任务可以获取而完成的,计算min,并根据当前worker数量判断是否需要addWorker()
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize
             
            //如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
             
            //如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
         
        //添加一个没有firstTask的worker
        //只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态
        addWorker(null, false);
    }
}

processWorkerExit(Worker w, boolean completedAbruptly)

worker:要结束的worker

completedAbruptly:是否突然完成(是否因为异常退出)

执行流程:

1、worker数量-1

    A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1

    B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了

2、从Workers Set中移除worker,删除时需要上锁mainlock

3、tryTerminate():在对线程池有负效益的操作时,都需要“尝试终止”线程池,大概逻辑:

    判断线程池是否满足终止的状态

    A、如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程

    B、没有线程了,更新状态为tidying->terminated

4、是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程

    线程池状态是running 或 shutdown

    A、如果当前线程是突然终止的,addWorker()

    B、如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()

    故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程

ThreadPoolExecutor线程池终止

终止线程池主要有两个方法:shutdown() 和 shutdownNow()。

shutdown方法

shutdown()后线程池将变成shutdown状态,此时不接收新任务,但会处理完正在运行的和在阻塞队列中等待处理的任务

/**
     * 开始一个有序的关闭,在关闭中,之前提交的任务会被执行(包含正在执行的,在阻塞队列中的),但新任务会被拒绝
     * 如果线程池已经shutdown,调用此方法不会有附加效应
     * <p>
     * 当前方法不会等待之前提交的任务执行结束,可以使用awaitTermination()
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); //上锁

        try {
            //判断调用者是否有权限shutdown线程池
            checkShutdownAccess();

            //CAS+循环设置线程池状态为shutdown
            advanceRunState(SHUTDOWN);

            //中断所有空闲线程
            interruptIdleWorkers();

            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock(); //解锁
        }

        //尝试终止线程池
        tryTerminate();
    }

shutdown()执行流程:

1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock

2、判断调用者是否有权限shutdown线程池

3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务

4、中断所有空闲线程 interruptIdleWorkers()

5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理

6、解锁

7、尝试终止线程池 tryTerminate()

可以看到shutdown()方法最重要的几个步骤是:更新线程池状态为shutdown中断所有空闲线程tryTerminated()尝试终止线程池

interruptIdleWorkers() 中断空闲线程过程如下:

    /**
     * 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续
     * <p>
     * onlyOne如果为true,最多interrupt一个worker * 只有当终止流程已经开始,
     * 但线程池还有worker线程时,tryTerminate()方法会做调用onlyOne为true的调用
     * (终止流程已经开始指的是:shutdown状态 且 workQueue为空,或者 stop状态)
     * 在这种情况下,最多有一个worker被中断,为了传播shutdown信号,以免所有的线程都在等待
     * 为保证线程池最终能终止,这个操作总是中断一个空闲worker
     * 而shutdown()中断所有空闲worker,来保证空闲线程及时退出
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); //上锁 
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne) break;
            }
        } finally {
            mainLock.unlock(); //解锁 } }
        }
    }

首先获取mainLock锁,因为要迭代workers set,在中断每个worker前,需要做两个判断:

1、线程是否已经被中断,是就什么都不做

2、worker.tryLock() 是否成功

第二个判断比较重要,因为Worker类除了实现了可执行的Runnable,也继承了AQS,本身也是一把锁,tryLock()调用了Worker自身实现的tryAcquire()方法,此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false,这也是AQS规定子类需要实现的尝试获取锁的方法

protected boolean tryAcquire(int unused) {
	if (compareAndSetState(0, 1)) {
		setExclusiveOwnerThread(Thread.currentThread());
		return true;
	}
	return false;
}

tryAcquire()先尝试将AQS的state从0-->1,返回true代表上锁成功,并设置当前线程为锁的拥有者。可以看到compareAndSetState(0, 1)只尝试了一次获取锁,且不是每次state+1,而是0-->1,说明锁不是可重入的。

worker.tryLock()获取worker锁

Woker类可以控制线程中断,因此在runWorker()方法中每次获取到task,task.run()之前都需要worker.lock()上锁,运行结束后解锁,即正在运行任务的工作线程都是上了worker锁的

shutdownNow方法

shutdownNow()后线程池将变成stop状态,此时不接收新任务,不再处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工作线程

    /**
     * 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表
     * 这个任务列表是从任务队列中排出(删除)的
     * <p>
     * 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()
     * <p>
     * 除了尽力尝试停止运行中的任务,没有任何保证
     * 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束
     */
    public List <Runnable> shutdownNow() {
        List <Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); //上锁

        try {
            //判断调用者是否有权限shutdown线程池
            checkShutdownAccess();

            //CAS+循环设置线程池状态为stop
            advanceRunState(STOP);

            //中断所有线程,包括正在运行任务的
            interruptWorkers();

            tasks = drainQueue(); 
            //将workQueue中的元素放入一个List并返回
        } finally {
            mainLock.unlock(); 
            //解锁
        }

        //尝试终止线程池
        tryTerminate();

        return tasks; 
        //返回workQueue中未执行的任务
    }

shutdownNow() 和 shutdown()的大体流程相似,差别是:

1、将线程池更新为stop状态

2、调用interruptWorkers()中断所有线程,包括正在运行的线程

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
        }finally {
        mainLock.unlock();
    }
}

interruptWorkers先对mainLock加锁,然后循环调用interruptIfStarted方法中断Worker的所有线程,其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt()。需要注意的是,对于运行中的线程调用Thread.interrupt()并不能保证线程被终止,task.run()内部可能捕获了InterruptException,没有上抛,导致线程一直无法结束

3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ThreadPoolExecutor任务提交与停止流程及底层实现 的相关文章

随机推荐

  • Redis系列学习2-五大类型(String,List,Hash,Set,ZSet)及其常规操作

    Redis的基本操作 Redis默认是有16个数据库 xff0c 默认使用的是第0个数据库 xff0c 可以通过select 切换数据库 xff0c Redis的命令大小写不敏感的 切换数据库 切换数据库 格式 xff1a select i
  • Redis系列学习3-geospatial地理空间

    geospatial 地理空间 可以用来实现定位 附近的人 打车APP上距离计算 距离的实现主要基于经纬度 xff0c 城市的经纬度查询 xff1a http www jsons cn lngcode geoadd 添加地址位置 格式 xf
  • 遗传算法求解TSP旅行商问题

    旅行商问题 旅行商问题 traveling salesman problem TSP 可描述为 已知N个城市之间的相互距离 现有一个商人必须遍访这N个城市 并且每个城市只能访问一次 最后又必须返回出发城市 如何安排他对这些城市的访问次序 使
  • 剑指Offer-面试算法题

    1 二分查找 xff08 递归与非递归实现 xff09 基本算法 xff0c 掌握好循环条件 package com company Description 二分查找 xff08 递归与非递归实现 xff09 Created by Wanb
  • Python爬虫-抓取PC端网易云音乐评论(GUI界面)

    歌曲搜素 网易云音乐网址为 xff1a https music 163 com 思路是进入后输入一个歌曲名 xff0c 点击搜索按钮 xff0c 通过开发者调试工具捕获搜索请求 xff0c 捕获到的数据信息如下 xff1a 所有的歌曲相关信
  • Package cmake is not available, but is referred to by another package.

    inux环境下安装Cmake报错 xff1a Package cmake is not available but is referred to by another package This may mean that the packa
  • 完美数问题

    题目描述 对于一个十进制正整数 xff0c 如果z的每一位数字只可能是1 2 3中的其中一个 xff0c 则称 是完美数 如 123 1 3321都是完美数而5 1234则不是 牛牛想写一个函数f n xff0c 使得其返回最大的不大于n的
  • 围圈抽牌报数问题

    问题描述 米免参加公司司建 xff0c 100个同事围坐圈 xff0c 裁判开始顺时针从头发牌 xff0c 每发3张白牌就会发出1张黑 牌 xff0c 抽到黑牌的人出局 xff0c 每局第N个抽到黑牌的将获得奖励 问如果米免想获得奖品 xf
  • RTX30系列-Ubuntu系统配置与深度学习环境Pytorch配置

    本文完成RTX3090Windows 43 Ubuntu双系统配置 xff0c 并配置深度学习环境 硬件环境为RTX3090 43 Z590主板 xff0c 64GB RAM xff0c 2TB固态 xff0c 8TB存储 Ubuntu系统
  • 【rotors】多旋翼无人机仿真(一)——搭建rotors仿真环境

    rotors 多旋翼无人机仿真 xff08 一 xff09 搭建rotors仿真环境 rotors 多旋翼无人机仿真 xff08 二 xff09 设置飞行轨迹 rotors 多旋翼无人机仿真 xff08 三 xff09 SE3控制 roto
  • JVM内存管理

    JVM内存管理 Java 虚拟机在执行 Java 程序的过程中会把它管理的内存划分成若干个不同的数据区域 xff0c JDK 1 8 和之前的版本的数据区域有所差异 xff0c JDK1 6如下图所示 图片来源 xff1a JavaGuid
  • AQS、Semaphore、CountDownLatch与CyclicBarrier原理及使用方法

    AQS AQS 的全称为 AbstractQueuedSynchronizer xff0c 翻译过来的意思就是抽象队列同步器 这个类在 java util concurrent locks 包下面 xff0c AQS 就是一个抽象类 xff
  • 滑动窗口框架算法

    最长覆盖子串 xff0c 异位词 xff0c 最长无重复子串等等许多子串问题用常规暴力法费时费力 xff0c 一些大佬的解法虽然很强效率很高 xff0c 但是太难想到了 xff0c 这类问题用滑动窗口算法解决非常的快捷简便 滑动窗口算法思想
  • Python-深度学习常用脚本

    记录一些因为在网络训练 xff0c 测试过程中经常用到的一些脚本 1 视频按帧提取 可以从一段视频中截取不同帧的图片 xff0c 并保存至文件夹 需要自己更改视频路径和图片保存路径 import os import cv2 import s
  • Java面试基础(一)

    1 重载与重写 重载就是同样的一个方法能够根据输入数据的不同 xff0c 做出不同的处理 重写就是当子类继承自父类的相同方法 xff0c 输入数据一样 xff0c 但要做出有别于父类的响应时 xff0c 你就要覆盖父类方法不同类型的对象 x
  • 网络篇-传输控制协议TCP

    TCP协议 传输控制协议 xff08 TCP xff0c Transmission Control Protocol xff09 用一句话概括的话 xff0c 它是一种面向连接的 可靠的 基于字节流的传输层通信协议 TCP xff08 传输
  • 阻塞队列-BlockingQueue

    对于Queue而言 xff0c BlockingQueue是主要的线程安全的版本 xff0c 具有阻塞功能 xff0c 可以允许添加 删除元素被阻塞 xff0c 直到成功为止 xff0c blockingqueue相对于Queue而言增加了
  • 线程池-ThreadPoolExecutor

    如果并发的线程数量很多 xff0c 并且每个线程都是执行一个时间很短的任务就结束了 xff0c 这样频繁创建线程就会大大降低系统的效率 xff0c 因为频繁创建线程和销毁线程需要时间 那么有没有一种办法使得线程可以复用 xff0c 就是执行
  • MySQL索引

    基础知识 索引是创建在表上的 xff0c 对数据库表中一列或多列的值进行排序的一种结构 xff0c 可以提高查询的速度 通俗的来说 xff0c 数据库中存储的数据比作字典的话 xff0c 索引就相当于是字典中的目录 如果没有索引 xff0c
  • ThreadPoolExecutor任务提交与停止流程及底层实现

    ThreadPoolExecutor任务提交 executor任务提交流程 通过查看源码可知 xff0c JUC下的Excutor接口仅提供了一个可执行方法executor public interface Executor Execute