08_ThreadPool线程池

2023-05-16

1. 架构说明

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,ExecutorService,ThreadPoolExecutor这几个类。

Executor接口是顶层接口,只有一个execute方法,过于简单。通常不使用它,而是使用ExecutorService接口:

那么问题来了,怎么创建一个连接池对象呢?通常使用Executors工具类

2. Executors工具类

架构图可以看到Executors工具类,有没有联想到Collections,Arrays等。没错,可以用它快速创建线程池。

List list = Arrays.asList("");
ExecutorService threadPool = Executors.newCachedThreadPool();

  1. Executors.newSingleThreadExecutor():单个线程的线程池。

  2. Executors.newFixedThreadPool(int nThreads):创建一个自定义个数线程的线程池。

  3. Executors.newCachedThreadPool():按内存大小弹性的分配线程数量。

直接编码演示:每种连接池的效果

public class ThreadPoolDemo {

    public static void main(String[] args) {
        // 创建单一线程的连接池
        // ExecutorService threadPool = Executors.newSingleThreadExecutor();
        // ExecutorService threadPool = Executors.newFixedThreadPool(3);
        ExecutorService threadPool = Executors.newCachedThreadPool();

        try {
            for (int i = 0; i < 5; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName() + "执行了业务逻辑");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

① Executors.newCachedThreadPool:  短期任务线程池 

当线程不足时,有新的任务直接创建新线程执行

  • 参数1:线程池运行稳定时需要维护的核心线程数量
  • 参数2: 最大允许创建的线程个数: Integer.MAX_VALUE  创建线程过多可能会导致OOM
  • 参数3+4: 线程池稳定时 闲置的线程的存活时间
  • 参数5:任务阻塞队列    SynchronousQueue不存储元素的阻塞队列
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                          60L, TimeUnit.SECONDS,
                          new SynchronousQueue<Runnable>());
}

② Executors.newFixedThreadPool(5); 固定线程数线程池 

当线程不足时,任务存到了任务队列中,有空闲线程时才会去执行

  • 参数1:核心线程数
  • 参数2:最大可创建的线程数
  • 参数3+4: 非核心线程数以外的线程的空闲存活时间  没有意义
  • 参数5:任务队列 new LinkedBlockingQueue<Runnable>() 最多可以存储Integer.MAX_VALUE多个任务对象(runnable)

任务不能及时处理时,任务队列最多支持Integer最大值个任务,可能会导致OOM

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

③ Executors.newSingleThreadExecutor():执行单个任务的线程池 

最大线程数核心线程数固定为1,任务队列长度没有限制 也可能会导致OOM

public static ExecutorService newSingleThreadExecutor() {    
    return new FinalizableDelegatedExecutorService
                                (new ThreadPoolExecutor(1, 1,
                                 0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>()));
}

④ Executors.newScheduledThreadPool(5):延迟任务的线程池 

任务队列长度、最大线程数都为Integer最大值,可能会导致OOM 

public ScheduledThreadPoolExecutor(int corePoolSize) {
      super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

3. 线程池7大参数

  1. corePoolSize:线程池中的常驻核心线程数

  2. maximumPoolSize:线程池中能够容纳同时 执行的最大线程数,此值必须大于等于1

  3. keepAliveTime:多余的空闲线程的存活时间 当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁直到 只剩下corePoolSize个线程为止

  4. unit:keepAliveTime的单位

  5. workQueue:任务队列,被提交但尚未被执行的任务

  6. threadFactory:表示生成线程池中工作线程的线程工厂, 用于创建线程,一般默认的即可

  7. handler:拒绝策略,表示当队列满了,并且工作线程大于 等于线程池的最大线程数(maximumPoolSize)时,如何来拒绝 请求执行的runnable的策略

4. 自定义线程池

在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)
                , Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
public class ThreadPoolDemo {

    public static void main(String[] args) {
        // 创建单一线程的连接池
        // ExecutorService threadPool = Executors.newSingleThreadExecutor();
        // 创建固定数线程的连接池
        // ExecutorService threadPool = Executors.newFixedThreadPool(3);
        // 可扩容连接池
        // ExecutorService threadPool = Executors.newCachedThreadPool();
		//创建延时任务的连接池
         //ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);
        //executor.scheduleAtFixedRate(()->{
         //   System.out.println("任务正在执行:"+ new Date());
        //},5 , 3 , TimeUnit.SECONDS);
        // 自定义连接池
        ExecutorService threadPool = new ThreadPoolExecutor(2, 5,
                2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                //new ThreadPoolExecutor.AbortPolicy()
                //new ThreadPoolExecutor.CallerRunsPolicy()
                //new ThreadPoolExecutor.DiscardOldestPolicy()
                //new ThreadPoolExecutor.DiscardPolicy()
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("自定义拒绝策略");
                    }
                }
        );

        try {
            for (int i = 0; i < 9; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "执行了业务逻辑");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

5. 自定义线程池线程数说明

开发中我们可以把任务分为计算(CPU)密集型IO密集型

计算(CPU)密集型任务大部份时间用来做计算、逻辑判断,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,任务同时进行的数量应当等于CPU的核心数。一般公式:线程数量=CPU核数+1个。

IO密集型CPU消耗很少,任务的大部分时间都在等待IO操作完成(99%的时间都花在IO上,花在CPU上的时间很少)。此类任务,任务越多,CPU效率越高,但也有一个限度。大部分任务都是IO密集型任务,比如Web应用。一般公式:线程数量=CPU核数/(1-阻塞系数) 阻塞系数为0.8~0.9之间。

6. 线程池底层工作原理

具体流程:

线程池底层工作原理:线程池初始化时,线程数为0
    接收任务时:  重要
      1、先判断线程数是否到达核心线程数
            如果未达到:创建线程执行任务
      2、如果达到,再判断任务队列是否已满
            如果未满:将任务存到任务队列中
      3、如果已满,判断线程数是否达到最大线程数
            如果未到达:创建新的线程执行任务
      4、如果已到达:使用拒绝策略拒绝本次任务
      5、线程池运行稳定时,最终线程数会收缩到核心线程数个
            线程执行完任务会从阻塞队列自动获取任务执行,如果任务队列空了该线程开始统计空闲时间。

重要的事情说三遍:以下重要:以下重要:以下重要:

  1. 在创建了线程池后,线程池中的线程数为零。

  2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:

    1. 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;

    2. 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;

    3. 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

    4. 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。

  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

    如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。

    所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

7. 拒绝策略

一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。

ThreadPoolExecutor自带的拒绝策略如下:

  1. AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行

  2. CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。

  3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中 尝试再次提交当前任务。

  4. DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。 如果允许任务丢失,这是最好的一种策略。

以上内置的策略均实现了RejectedExecutionHandler接口,也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略。

拒绝策略: 核心线程数3  最大5  队列长度10
AbortPolicy:抛出异常
           任务队列已满线程数达到最大线程数没有空闲线程时 抛出异常

DiscardOldestPolicy: 丢弃等待时间最长的任务
          1,2,3 任务:创建核心线程处理
          4,5,6,7,8,9,10,11,12,13 任务:存到任务队列中   4最先进入到任务队列
          14,15 任务:创建扩展线程执行任务
          16 任务:线程数已满 任务队列已满 丢弃4任务 5,6,7,8,9,10,11,12,13,16
          17 任务:线程数已满 任务队列已满 丢弃5任务 6,7,8,9,10,11,12,13,16,17

DiscardPolicy: 直接丢弃不能处理的任务
          1,2,3 任务:创建核心线程处理
          4,5,6,7,8,9,10,11,12,13 任务:存到任务队列中   4最先进入到任务队列
          14,15 任务:创建扩展线程执行任务
          16、17 任务:不能处理直接丢弃

CallerRunsPolicy: 调用者执行任务
          不能及时执行的任务由主线程来执行

RejectedExecutionHandler:拒绝策略的接口
 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

08_ThreadPool线程池 的相关文章

随机推荐

  • MySQL常见七种通用的Join查询练习题

    准备数据库表 t dept 和 t emp CREATE TABLE 96 t dept 96 96 id 96 int NOT NULL AUTO INCREMENT 96 deptName 96 varchar 30 DEFAULT N
  • MySQL索引分类

    主键索引 xff1a 设定为主键后数据库会自动建立索引 xff0c innodb为聚簇索引 单值索引 xff1a 即一个索引只包含单个列 xff0c 一个表可以有多个单列索引 唯一索引 xff1a 索引列的值必须唯一 xff0c 但允许有空
  • Docker 部署 MySQL 一主多从

    主从复制的原理 xff1a 1 主库 xff1a 创建一个有权访问binlog日志的从库账号 xff0c 配置需要主从复制的库 有写操作时 xff0c 可以将写操作或者写操作之后的数据记录到日志文件中 binlog 通过一个线程通知需要同步
  • Java笔记(8)——重载(Overload)与重写(Override)的区别

    1 重写 xff08 Override xff09 重写是子类对允许访问的父类的方法进行重新编写的过程 xff0c 方法名 返回值和参数列表不能变 xff0c 方法中的内容可以变化 特点就是 xff1a 子类可以根据自己的需要对父类的方法进
  • ShardingSphere介绍

    官网 xff1a https shardingsphere apache org index zh html 文档 xff1a https shardingsphere apache org document 5 1 1 cn overvi
  • ShardingSphere-JDBC读写分离

    基于之前搭建的mysql主从读写分离使用ShardingSphere JDBC实现读写分离 参考文章 xff1a Docker 部署 MySQL 一主多从 书启秋枫的博客 CSDN博客 CREATE DATABASE mydb2 USE m
  • ShardingSphere-JDBC垂直分片

    什么是数据分片 xff1f 简单来说 xff0c 就是指通过某种特定的条件 xff0c 将我们存放在同一个数据库中的数据分散存放到多个数据库 xff08 主机 xff09 上面 xff0c 以达到分散单台设备负载的效果 数据的切分 xff0
  • ShardingSphere-JDBC水平分片

    项目中可以使用ShardingSphere JDBC将数据存到不同库的表中 一 准备服务器 服务器规划 xff1a 使用docker方式创建如下容器 主服务器 xff1a 容器名server order0 xff0c 端口3310从服务器
  • ShardingSphere-JDBC绑定表

    一 什么是绑定表 指分片规则一致的一组分片表 使用绑定表进行多表关联查询时 xff0c 必须使用分片键进行关联 xff0c 否则会出现笛卡尔积关联或跨库关联 xff0c 从而影响查询效率 例如 xff1a t order 表和 t orde
  • ShardingSphere-JDBC广播表

    一 什么是广播表 指所有的分片数据源中都存在的表 xff0c 表结构及其数据在每个数据库中均完全一致 适用于数据量不大且需要与海量数据的表进行关联查询的场景 xff0c 例如 xff1a 字典表 广播具有以下特性 xff1a xff08 1
  • 01_JUC概述

    1 JUC是什么 xff1f 在 Java 5 0 提供了 java util concurrent 简称JUC 包 xff0c 在此包中增加了在并发编程中很常用的工具类 此包包括了几个小的 已标准化的可扩展框架 xff0c 并提供一些功能
  • 02_Lock锁

    首先看一下JUC的重磅武器 锁 xff08 Lock xff09 相比同步锁 xff0c JUC包中的Lock锁的功能更加强大 xff0c 它提供了各种各样的锁 xff08 公平锁 xff0c 非公平锁 xff0c 共享锁 xff0c 独占
  • 03_线程间通信

    面试题 xff1a 两个线程打印 两个线程 xff0c 一个线程打印1 52 xff0c 另一个打印字母A Z打印顺序为12A34B 5152Z xff0c 要求用线程间通信 public class Demo01 public stati
  • 04_并发容器类

    1 重现线程不安全 xff1a List 首先以List作为演示对象 xff0c 创建多个线程对List接口的常用实现类ArrayList进行add操作 public class NotSafeDemo public static void
  • Java笔记(10)——异常处理

    1 Java异常 Java运行时发生异常可以分为两类 xff1a Error xff1a JVM系统内部错误 资源耗尽等问题产生的异常 Exception xff1a 编程错误或偶然的外在因素导致的 2 常见的异常 2 1 RuntimeE
  • mariadb 数据库连接使用

    今天测试了使用mariadb的使用 xff0c 我使用的springboot 43 mariadb 来操作数据库 xff0c 和以前的代码基本一样 xff0c 就数据变成了mariadb xff0c 驱动还是使用mysql的 pom 文件如
  • 05_JUC强大的辅助类

    JUC的多线程辅助类非常多 xff0c 这里我们介绍三个 xff1a CountDownLatch xff08 倒计数器 xff09 CyclicBarrier xff08 循环栅栏 xff09 Semaphore xff08 信号量 xf
  • 06_Callable接口

    Thread 类 Runnable 接口使得多线程编程简单直接 但Thread类和Runnable接口都不允许声明检查型异常 xff0c 也不能定义返回值 没有返回值这点稍微有点麻烦 不能声明抛出检查型异常则更麻烦一些 public voi
  • 07_阻塞队列(BlockingQueue)

    目录 1 什么是BlockingQueue 2 认识BlockingQueue 3 代码演示 栈与队列概念 栈 Stack xff1a 先进后出 xff0c 后进先出 队列 xff1a 先进先出 1 什么是BlockingQueue 在多线
  • 08_ThreadPool线程池

    1 架构说明 Java中的线程池是通过Executor框架实现的 xff0c 该框架中用到了Executor xff0c ExecutorService xff0c ThreadPoolExecutor这几个类 Executor接口是顶层接