Java自定义线程池详解及代码实现(非直接调用ThreadPoolExecutor)

2023-05-16

JJava自定义线程池详解及代码实现【非直接调用ThreadPoolExecutor】

      • JDK中的线程池函数ThreadPoolExecutor
      • JDK中的线程池执行任务时的流程
      • 自定义线程池业务分析
      • 自定义线程池的代码实现-注释详尽
        • 1.定义阻塞队列
        • 2.定义线程池以及线程对象内部类
        • 3.定义拒绝策略,只定义接口,之后策略由调用者传入。
      • 自定义线程池代码测试

要实现自定义的线程池,首先得了解线程池的工作流程。
我们可以参考JDK中自定的线程池工作流程去理解,并实现其简化版本。

JDK中的线程池函数ThreadPoolExecutor

JDK中实现线程池的函数如下,其中包含了7个参数。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

这7个参数的含义如下:
在这里插入图片描述

  • corePoolSize表示线程池中的核心线程数,核心线程为常驻线程池中的线程的个数。
  • maximumPoolSize表示最大线程数,其中救急线程个数等于maximumPoolSize - corePoolSize,救急线程为线程池核心线程已满,并且阻塞队列已满时使用的线程,救急线程并不会常驻线程池,其有空闲存活时间,可以通过之后的参数设定。
  • keepAliveTime表示救急线程空闲存活时间,当救急线程没有任务时,等待keepAliveTime之后还没有任务,则消亡。
  • unit 的时间单位。
  • workQueue阻塞队列,当线程池中的核心线程都在工作时,之后来的任务会让在队列中等待执行。
  • threadFactory 创建线程的工厂,一般用来为线程设定可辨识的名字。
  • handler拒绝策略,当线程池无法执行之后来的任务的处理策略。一般包括 1)AbortPolicy 让调用者抛出RejectedExecutionException异常,这是默认策略 2)CallerRunsPolicy 让调用者运行任务 3)DiscardPolicy 放弃本次任务 4)DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之。我们也可以实现自己拒绝策略: 1) 死等 2) 带超时等待 3) 让调用者放弃任务执行 4) 让调用者抛出异常 5) 让调用者自己执行任务等。

JDK中的线程池执行任务时的流程

当我们调用JDK中的线程池执行任务时,其流程一般如下:

  1. 当我们想向线程池提交任务时,如果线程池的线程数中小于核心线程数时,线程池新建核心线程,任务交给核心线程执行。
  2. 如果线程池中线程数等于核心线程数,则将后来的任务放入到阻塞队列中,等待核心线程执行完任务之后,从队列中取任务执行。
  3. 当核心线程都在工作,并且阻塞队列已满时,则创建救急线程,可创建的救急线程数为maximumPoolSize - corePoolSize,将任务交给救急线程执行。
  4. 当核心线程已满、阻塞队列已满,救急线程已满时,则执行根据所提供的拒绝策略对后来的任务进行处理。

自定义线程池业务分析

我们可以对上述流程进行分析,简化处理去除掉救急线程,来完成自定义的线程池。

  1. 定义阻塞队列。通过对线程池的业务分析,我们可以发现,当线程池中的线程都在工作时,后续的任务需要放置到阻塞队列中。之后线程池中的线程都是从阻塞队列中获取任务执行,新来的任务也是被放置到阻塞队列中。此处的工作方式,为生产者消费者模式。因为阻塞队列为多线程中的共享资源,所以需要加锁以确保共享资源的安全性。
  2. 阻塞队列的实现分析。首先需要定义队列容量,定义双端队列来存储任务,此处任务类型可定义为泛型。之后需要定义两种方法,分别是任务存入队列的方法和从队列中获取任务的方法。在存和取得过程中使用可重入锁进行加锁判断。在取任务的过程中,加锁,如果过队列为空,则阻塞等待,也可以设置为带超时的阻塞等待,后续会附上代码,否则直接从队列中返回任务对象。在存任务的过程中,加锁,如果队列为满,则阻塞等待,也可以设置为带超时的阻塞等待,如果不为空,则存入。具体见之后的代码实现。
  3. 定义线程池。首先需要设定核心线程数的大小,定义集合对象存储已经创建的线程,方便之后根据集合对象获取已创建的线程的个数。当新来任务时,如果集合的大小小于核心线程数则新建线程,执行任务。否则当核心线程已经全部工作时,需要将新来的任务放入阻塞队列中,等待线程执行完毕从阻塞队列中获取任务,相当于生产者与消费者模式中的消费者。
  4. 定义线程对象。该线程对象继承自Thread类,并重写其run()方法。该线程对象即为线程池中的线程。我们在其run()方法中,编写线程执行任务的过程,任务我们定义为Runnable对象,通过调用在线程对象中run()方法中调用Runnable对象的run方法,执行任务,当执行完毕时候,该线程对象继续尝试从阻塞队列中获取任务,可以阻塞获取(即一直等待,有任务就执行,没任务就等待),或者超时等待(当超时之后,直接放弃获取)。
  5. 定义拒绝策略。拒绝策略通过使用策略模型实现,我们只定义拒绝策略的接口,具体逻辑通过调用者实现。拒绝策略的使用情况。当阻塞队列已满时,如果还有新来的任务,则使用拒绝策略进行处理。当队列未满则直接添加。

以上就是自定义线程池所需要的对象方法,接下来我们使用Java代码一一实现。

自定义线程池的代码实现-注释详尽

我们将任务定义为Runnable对象,线程池中的线程对象为Thread的子类,这样就可以将Runnable对象,传给Thread进行处理。当然其他实现也可以,可以自行尝试。

1.定义阻塞队列

class BlockingQueue<T>{
    // 1.任务队列, 双向队列
    private Deque<T> queue = new ArrayDeque<>();

    // 2.锁
    private ReentrantLock lock = new ReentrantLock();

    // 3.生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    // 4.消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5.容量
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    // 超时阻塞获取任务
    public T pull(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 将超时时间统一转换为纳秒
            long nanos = unit.toNanos(timeout);
            // 取任务的时候,如果为空则需要等待
            while(queue.isEmpty()){
                // 超时的情况直接返回null
                if(nanos <= 0){
                    return null;
                }
                // 返回的是剩余的时间
                nanos = emptyWaitSet.awaitNanos(nanos);
            }
            T t = queue.removeFirst();
            // 唤醒放入的线程
            fullWaitSet.signal();
            return t;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取任务
    public T take(){
        // 加锁
        lock.lock();
        try{
            // 取任务的时候,如果为空则需要等待
            while(queue.isEmpty()){
                emptyWaitSet.await();
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的阻塞添加任务
    public boolean offer(T task, long timeout, TimeUnit timeUnit){
        lock.lock();
        try{
            long nanos = timeUnit.toNanos(timeout);
            // 添加任务时,如果队列已满则需要等待
            while(queue.size()==capacity ){
                System.out.println(task.toString() + " 等待加入任务队列" );
                if(nanos<=0){
                    return false;
                }
                nanos = fullWaitSet.awaitNanos(nanos);
            }
            queue.addLast(task);
            System.out.println("任务【" + task.toString() +  "】加入队列 " );
            emptyWaitSet.signal();
            return true;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    // 阻塞添加任务
    public void put(T task){
        lock.lock();
        try{
            while(queue.size()==capacity){
                System.out.println(task.toString() + " 等待加入任务队列" );
                fullWaitSet.await();
            }
            queue.addLast(task);
            System.out.println("任务【" + task.toString() +  "】加入队列 " );
            emptyWaitSet.signal();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    // 获取队列大小
    public int size(){
       lock.lock();
       try{
           return queue.size();
       }finally {
           lock.unlock();
       }
    }
    
    // 为使用拒绝策略所添加的向队列中添加任务的方法
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try{
            // 队列已满
            if(queue.size()==capacity){
                rejectPolicy.reject(this,task);
            }else{ // 有空闲
                queue.addLast(task);
                System.out.println("任务【" + task.toString() +  "】加入队列 " );
                emptyWaitSet.signal();
            }

        }finally {
            lock.unlock();
        }
    }
}

2.定义线程池以及线程对象内部类

class ThreadPool{
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();

    // 核心线程数
    private int coreSize;

    // 获取任务的超时时间,时间单位,当从队列中获取超时时,放弃获取
    private long timeout;
    private TimeUnit timeUnit;

    // 拒绝策略
    private RejectPolicy<Runnable> rejectPolicy;

    // 线程池传入任务的方法
    public void execute(Runnable task){
        // 当任务数没有超过coreSize,直接交给worker对象执行
        // 如果任务数超过coreSize时,加入任务队列
        // 因为集合workers为共享变量,所以此处也需要加锁
        synchronized (workers){
            if(workers.size() < coreSize){
                Worker worker = new Worker(task);
                System.out.println("新增worker " + worker.toString() + " 任务 " + task.toString());
                workers.add(worker);
                worker.start();
            }else{
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    class Worker extends Thread{
        private Runnable task;

        private Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1.当task不为空,则执行任务
            // 2.当task执行完毕,接着去任务队列中获取并执行
            // 此处使用了短路逻辑
            while(task !=null || (task = taskQueue.pull(timeout, timeUnit)) !=null ){
                try{
                    System.out.println("正在执行: " + task.toString());
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            // 超时获取时,如果未获取到任务,则结束该线程
            synchronized (workers){
                System.out.println("worker 移除:" + this.toString());
                workers.remove(this);
            }
        }
    }
}

3.定义拒绝策略,只定义接口,之后策略由调用者传入。

interface RejectPolicy<T>{
    void reject(BlockingQueue<T> queue, T task);
}

自定义线程池代码测试

public class MyThreadPoolTest {
    public static void main(String[] args) {
         // 定义线程池,传入参数为线程数,超时时间(当获取任务时间超过改时间时,结果等待)
         // 时间单位, 队列容量,拒绝策略,此处出lambda表达式,因为我们实现的拒绝策略只有一个接口,所以可以这样写
         ThreadPool pool = new ThreadPool(1, 1000,
                 TimeUnit.MICROSECONDS, 1,(queue,task)->{
                 // 1.死等
                 // queue.put(task);
                 // 2.带超时的等待
                 // queue.offer(task, 1500, TimeUnit.MILLISECONDS);
                 // 3.让调用者放弃任务执行
                 // System.out.println("队列已满放弃等待");
                 // 4.抛出异常
                 throw new RuntimeException("任务执行失败,队列已满" + task);
                 // 5.自己执行
                 // task.run();
         });
        
         // 给线程池提交任务,循环3次,任务为打印,执行每次失眠一秒
        for (int i = 0; i < 3; i++) {
            int id = i+1;
            pool.execute(()->{
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().toString()+ " " + id);
            });
        }
    }
}

测试结果;
在这里插入图片描述
结果分析;

我们创建了线程个数为1的线程池,并且阻塞队列也为1,拒绝策略为直接抛出异常。当有三个任务时,我们可以看到刚开始第一个任务JUC.MyThreadPoolTest$$Lambda$2/2074407503@4dd8dc3到来,线程池创建了线程对象,第二个任务JUC.MyThreadPoolTest$$Lambda$2/2074407503@568db2f2加入的阻塞队列,第一个任务执行。当第三个任务来时,因为线程池中线程正忙,阻塞队列已满,所以根据拒绝策略直接抛出了异常。当两个任务执行完毕之后,线程池中的线程尝试从阻塞队列中继续超时获取,但是超时之后未获取到,所以直接结束,并删除了线程池中的线程,任务结束。
其他的情况,可以自行尝试。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java自定义线程池详解及代码实现(非直接调用ThreadPoolExecutor) 的相关文章

随机推荐

  • idea自带的Maven添加阿里镜像

    打开idea xff0c 并打开设置 在搜索框查找Maven xff0c 可以看到idea使用的Maven路径 xff0c 配置文件路径 xff0c 以及仓库路径 重点是看配置文件 xff1a settings xml 如果在配置文件路径下
  • Ubuntu16.04开机失败—进入tty1终端修复

    Ubuntu16 04开机失败 进入tty1终端修复 如图所示 xff0c 我的Ubuntu16 04开机的时候报错 xff0c 提示 etc profile 文件的第34行出错 我想起了这个是自己安装mysql和sqoop的时候配的路径
  • 下载网页中的视频的两种方法

    方法一 xff1a 使用360或者IE浏览器 1 进入播放视频的网页 xff0c 播放视频并缓冲完全 xff1b 2 点击浏览器 工具 栏菜单中 Internet 选项 xff1b 3 在弹出的窗口中间部位找到 设置 xff1b 4 在新窗
  • tigerVNC的简单使用教程(CentOS的远程桌面连接)

    tigerVNC的简单使用教程 xff08 CentOS的远程桌面连接 xff09 DayDreamingBoy的博客 CSDN博客 tigervnc 1 环境和软件准备 1 CentOS 6 3下 root 64 localhost rp
  • gazebo视角调整

    看见上图中的橘黄色的图标了吗 xff1f 点击下拉框 xff0c 就可以调整自己的视角 xff0c 然后配合Ctrl 43 鼠标拖拽就OK了 参考博客 参考一
  • RTX 线程通信之——内存池

    文章目录 Memory Pool为什么需要内存池 xff1f 什么是内存池 xff1f RTX内存池API 案例 xff1a 按键控制LED灯定义相关创建相关执行相关实验效果 小结参考资料 Memory Pool 内存池 Memory Po
  • springboot项目多环境配置及常见配置名的含义

    强烈推荐大家想学习springboot项目相关知识的 xff0c 可以看一下Gitee上大佬整理的Spring Boot基础教程 xff0c 非常适合初学者和进阶学习 xff1a 传送门 我们在进行项目开发时 xff0c 经常同一个应用需要
  • 聊聊linux中的文件种类、文件名、文件扩展名

    linux中的文件种类 文件名 文件扩展名详解 在使用 ls l指令后可以看到文件的类型 xff0c 其中第一个字符就是代表的文件的类型 xff0c 常见的文件类型是一般文件 和目录文件d 文件的类型 1 正规文件 xff08 regula
  • 华为2288 v5服务器安装centos7.9教程

    华为2288 v5服务器安装centos7 9教程 一 准备工作二 centos启动盘制作三 删除 配置RAID四 选择启动项1 开机按F112 选择u盘启动 五 centos安装1 选择语言2 选择安装方式3 选择安装位置 xff08 重
  • Pytorch-gpu版安装教程【注意:无需提前安装cuda和cudnn】

    Pytorch gpu版安装教程 注意 xff1a 无需提前安装cuda和cudnn 1 首先确保你已经安装好Anaconda2 查看自己电脑上显卡的信息 xff0c 通过显卡控制面板查看3 如何根据想要的cuda的版本下载相应的显卡驱动程
  • python调用有道翻译API进行翻译

    python调用有道翻译API进行翻译 步骤 python调用有道翻译API进行翻译准备调用API所需的APPID以及秘钥1 有道智云注册账号2 有道智云注册账号3 创建实例 xff0c 绑定应用4 查看官方文档 python实现对有道翻译
  • Linux安装cuda10.2

    Linux安装cuda10 2 安装其他版本的cuda也可以参考以下步骤 A 进入NVIDIA官网下载安装文件 百度搜索cuda 10 2 点开第一个网页 找到对应自己系统版本的安装文件命令 在命令行中执行给出的代码 xff0c 下载安装文
  • Windows 10 安装anaconda

    Windows 10 安装anaconda 1 下载anaconda安装包2 安装Anaconda3 配置Anaconda环境变量4 检验是否安装成功4 anaconda换源5 python换源 1 下载anaconda安装包 网页搜索清华
  • 排序算法-基数排序

    排序算法 基数排序 算法思想 基数排序是采用多关键字排序思想 xff08 即基于关键字各位的大小进行排序地 xff09 xff0c 借助 分配 收集 两种操作对单逻辑关键字进行排序 基数排序又分为最高位优先 MSD 降序 排序和最低位优先
  • No module named ‘cv2‘ 解决方法

    No module named cv2 解决方法 1 安装opencv python 在使用的虚拟环境中 xff0c 输入以下命令 pip span class token function install span opencv pyth
  • AttributeError:module ‘distutils‘ has no attribute ‘version

    AttributeError module distutils has no attribute 39 version 在使用torch utils tensorboard时 xff0c 出现错误 xff1a 出错语句from torch
  • CPU、GPU、NPU的区别

    CPU GPU NPU的区别 CPU CPU xff08 CentralProcessing Unit xff09 中央处理器 xff0c 是一块超大规模的集成电路 xff0c 主要逻辑架构包括控制单元Control xff0c 运算单元A
  • jdk(Linux+Windows)环境变量配置

    Windows jdk环境变量配置 xff1a PATH JAVA HOME bin JAVA HOME jre bin CLASSPATH JAVA HOME lib JAVA HOME lib tools jar JAVA HOME l
  • 并发编程-生产者消费者模式Java代码实现

    并发编程 生产者消费者模式Java代码实现 生产者消费者模式 生产者仅负责产生结果数据 xff0c 不关心数据该如何处理 xff0c 而消费者专心处理结果数据 消息队列是有容量限制的 xff0c 满时不会再加入数据 xff0c 空时不会再消
  • Java自定义线程池详解及代码实现(非直接调用ThreadPoolExecutor)

    JJava自定义线程池详解及代码实现 非直接调用ThreadPoolExecutor JDK中的线程池函数ThreadPoolExecutorJDK中的线程池执行任务时的流程自定义线程池业务分析自定义线程池的代码实现 注释详尽1 定义阻塞队