Springboot自定义ThreadPoolTaskExecutor线程池多线程并发执行异步方法

2023-11-01

1. 背景

当前因为工作需求,要发送大量Http请求,经过实践遍历发送需要6小时才能发送完毕,如果单线程发送请求会导致主线程阻塞。就会存在以下问题:

  1. 前端用户等待响应时间过长,无法进行下一步操作,不利于用户操作系统
  2. 响应时间过长超过Tomcat服务器会话时间,导致前后端的请求重新连接,这会出现抛出java.io.IOException: 你的主机中的软件中止了一个已建立的连接;重而终止了还未完成的Http发送任务
  3. 如果主线程其他任务如:定时Excel数据批量导入,文件上传等等;很容易因为文件格式问题,导致抛出异常,从而把Http的任务中断
  4. 夜长梦多,长时间发送请求,无法判断是否执行完毕;如果抛出异常,或是需要重启服务,无法判断执行到什么阶段,定位重发位置需要耗费很多时间精力,得不偿失,只能全部重发(一样面临以上问题)。

2. 解决思路

经过考虑可以使用多线程来解决问题,主线程负责开子线程处理请求。其中根据业务需要可以将多线程分为以下2类

  • 任务类型1. 开完子线程就返回响应用户的请求。
  • 任务类型2. 开完子线程后等待子线程响应结果后,再响应用户请求
    在这里插入图片描述

在这里插入图片描述


3. 具体实现

Springboot中已经有集成的ThreadPoolTaskExecutor线程池可以供调用(注意区分Java自带的ThreadPoolExecutor),虽然2种线程池的提供的具体方法不一定一样,但是调度线程过程原理是通用,下面贴出ThreadPoolExecutor的调度线程过程作为创建ThreadPoolTaskExecutor的参考。

在这里插入图片描述

4. 开始编写代码

@EnableAsync 注解启用Spring 异步方法执行功能

当前的线程池 :

  • 核心线程池(CorePool) : 10 ;
  • 最大线程池(MaxPool) : 20 ;
  • 队列长度(QueueCapacity) : 200;
  • 拒绝策略(RejectedExecutionHandler) : CallerRunsPolicy

表示:

  • 如果 请求创建的线程数 <= 10 + 200, 只会使用核心线程池的10个线程执行任务;
  • 如果 10 + 200 < 请求创建的线程数 , 创建额外的10个线线程处理任务, 线程池中同时拥有20个线程处理任务

    使用CallerRunsPolicy策略,主线程还没将 (请求创建的线程数 - 10 + 200 ) 的子线程创建完毕, 主线程处于阻塞状态,需要将所有子线程创建完毕才可以返回响应,即使任务是任务类型1,它的响应时间也会无限接近于 任务类型2;根据业务需求可以通过设置更长的队列长度来解决

    线程处理完任务后,(最大线程池 - 核心线程池) > 0 && 经过60s空闲时间,就会回收除核心线程池外的空闲线程
@EnableAsync //注解启用Spring 异步方法执行功能
@Configuration
public class AsyncTaskConfig {

    @Bean("AsyncTaskThreadPool")
    public Executor threadPoolExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数:线程池创建时候初始化的线程数
        executor.setCorePoolSize(10);
        // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(20);
        // 缓冲队列:用来缓冲执行任务的队列
        executor.setQueueCapacity(200);
        // 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(60);
        // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix("AsyncTaskThreadPool-");
        // 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
        //AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常
        //DiscardPolicy:丢弃任务,但是不抛出异常。可能导致无法发现系统的异常状态
        //DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
        //CallerRunsPolicy:不丢弃任务 由调用线程处理该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
        executor.setAllowCoreThreadTimeOut(true);
        executor.initialize();
        return executor;
    }

}


@Async 根据添加到自定义线程池中执行

@Component
public class AsyncRequestComponent {

    @Async("AsyncTaskThreadPool")
    public CompletableFuture<String> mockHttpRequest(String url) throws InterruptedException, IOException {
        String threadName = Thread.currentThread().getName();
        System.out.println("线程" + threadName + "开始调用,输出:" + url);
        String result = "";
        // HttpClient RestTemplate Feign or 其他Http请求框架
        // 或者其他需要 异步并发 的 逻辑代码 
        // Thread.sleep(50);
        result = HttpClientUtil.get(url);

        System.out.println("线程" + threadName + "调用结束,输出:" + url);
        return CompletableFuture.completedFuture(result);
    }
}


@Slf4j
@SpringBootTest
class DemoTestApplicationTests {
    @Autowired
    private AsyncRequestComponent asyncRequestComponent;

     @Test
    void testAsyncThreadPerformance() throws InterruptedException, ExecutionException {

        long start = System.currentTimeMillis();
        List<CompletableFuture<String>> list = new ArrayList<>();
        // 通过修改循环次数来查看 任务数<缓存队列+核心线程 和 任务数>缓存队列+核心线程 的主线程花费时间
        // 如果 任务数<缓存队列+核心线程 主线程不需要等待
        // 如果 任务数>缓存队列+核心线程 主线程需要等待 因为后面任务还没创建成功
        for(int i=0 ;i < 2000; i++){
            CompletableFuture<String> future= asyncRequestComponent.mockHttpRequest(i);
            list.add(future);
        }

        // 任务类型1 主线程不需要等待所有子线程执行完毕,将下面的for循环注释
        // 任务类型2 主线程需要等待所有子线程执行完毕:需要遍历future执行get方法等待子线程执行完毕
        for(Future<?> future : list){
            while (true) {//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
                if (future.isDone() && !future.isCancelled()) { //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
                    String result = (String) future.get();//获取结果
                    System.out.println("任务i=" + result + "获取完成!" + new Date());
                    break;//当前future获取结果完毕,跳出while
                } else {
                    Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
                }
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("主线程花费时间:"+(end-start));
    }

}


5. 结论

  • 如果希望主线程永远不会阻塞,不设置队列长度,因为默认长度设置为Integer.MAX_VALUE; 因为队列长度很长,不会额外再创建线程处理了,所以核心线程(CorePool) = 最大线程(MaxPool)
  • 如果本身业务用户可以容忍一定范围的响应延迟,这就需要根据实际出发,统计子线程任务的平均执行实际,判断核心线程数、最大线程数、缓冲队列的容量



题外话:如何创建线程池

CPU密集型:对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 8 核的 CPU,每个核一个线程,理论上创建 8 个线程就就可以;理论公式:最大线程数 = CPU核数 + 1

IO 密集型任务: IO 密集型任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。理论公式 : 最大线程数 = 最大线程数 = CPU核心数 / (1 - IO_TIME/REQUEST_TIME) ,比如在某个请求中,请求时长为10秒,调用IO时间为8秒,这时我们阻塞占百分比就是80%,有效利用CPU占比就是20%,假设是八核CPU,我们线程数就是8 / (1 - 80%) = 8 / 0.2 = 40个

最大线程数:

  • CPU密集型:CPU 核心数 + 1
  • IO密集型:CPU 核心数 / (1 - IO_TIME/REQUEST_TIME)

核心线程数: 核心线程数 = 最大线程数 * 20%


获取CPU核心数命令:

window - Java : Runtime.getRuntime().availableProcessors()//获取逻辑核心数,如6核心12线程,那么返回的是12

linux - 物理CPU: cat /proc/cpuinfo| grep “physical id”| sort| uniq| wc -l

linux - 物理CPU中core: cat /proc/cpuinfo| grep “cpu cores”| uniq

linux - 逻辑CPU: cat /proc/cpuinfo| grep “processor”| wc -l


偷懒大法:

  • 核心线程数:CPU核心数
  • 最大线程数:CPU逻辑CPU( CPU核心数 * 2)

参考资料:

https://blog.csdn.net/weixin_49258262/article/details/125463819?spm=1001.2014.3001.5506
https://blog.csdn.net/Wei_Naijia/article/details/127028398?spm=1001.2014.3001.5506
https://blog.csdn.net/wozyb1995/article/details/125044992?spm=1001.2014.3001.5506
https://blog.csdn.net/iampatrick_star/article/details/124490586?spm=1001.2014.3001.5506
https://www.cnblogs.com/651434092qq/p/14240406.html
https://juejin.cn/post/6948034657321484318
https://juejin.cn/post/7072281409053786120

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Springboot自定义ThreadPoolTaskExecutor线程池多线程并发执行异步方法 的相关文章

  • Java:扩展类并实现具有相同方法的接口

    可能无法完成以下操作 我收到编译错误 继承的方法 A doSomthing int 无法隐藏 B 中的公共抽象方法 public class A int doSomthing int x return x public interface
  • Java Logger 未记录到 Netbeans 中的输出

    我正在 Netbeans 中使用 Maven 启动一个 Java 项目 我编写了一些代码来使用 Logger 类进行日志记录 但是 日志记录似乎不起作用 在程序开始时 我运行 Logger getLogger ProjectMainClas
  • 如何在java中将数组值排序为循环格式?

    我的数组值如下 String value 1 2 3 4 5 6 7 8 9 10 假设如果我将值 5 传递给 tat 数组 它应该按如下顺序排序 5 6 7 8 9 10 1 2 3 4 怎么办 有人帮忙吗 感谢你 你需要的就是所谓的轮换
  • 垃圾收集器如何在幕后工作来收集死对象?

    我正在阅读有关垃圾收集的内容 众所周知 垃圾收集会收集死亡对象并回收内存 我的问题是 Collector 如何知道任何对象已死亡 它使用什么数据结构来跟踪活动对象 我正在研究这个问题 我发现GC实际上会跟踪活动对象 并标记它们 每个未标记的
  • 与 Eclipse 中的 Java Content Assist 交互

    作为我的插件项目的一部分 我正在考虑与 Eclipse 在 Java 文件上显示的内容辅助列表进行交互 我正在尝试根据一些外部数据对列表进行重新排序 我看过一些有关创建新内容辅助的教程 但没有看到有关更改现有内容辅助的教程 这可能吗 如果是
  • 如何在 Java 中向时间戳添加/减去时区偏移量?

    我正在使用 JDK 8 并且玩过ZonedDateTime and Timestamp很多 但我仍然无法解决我面临的问题 假设我得到了格式化的Timestamp在格林威治标准时间 UTC 我的服务器位于某处 假设它设置为Asia Calcu
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate
  • Java 中如何将 char 转换为 int? [复制]

    这个问题在这里已经有答案了 我是Java编程新手 我有例如 char x 9 我需要得到撇号中的数字 即数字 9 本身 我尝试执行以下操作 char x 9 int y int x 但没有成功 那么我应该怎么做才能得到撇号中的数字呢 ASC
  • Sun 在 EDT 之外做 GUI 工作的演示?

    我正在看SplashDemo java http download oracle com javase tutorial uiswing examples misc SplashDemoProject src misc SplashDemo
  • 如何在 ant 中为 junit 测试设置 file.encoding?

    我还没有完全完成file encoding 和 ant https stackoverflow com questions 1339352 how do i set dfile encoding within ants build xml
  • 如何使用 JMagick 转换色彩空间?

    如何使用 JMagick API 转换色彩空间 例如 CMYK gt RGB 和 RGB gt CMYK None
  • 提高 PostgreSQL 1 亿数据左连接查询性能

    我在用Postgresql 9 2 version Windows 7 64 bit RAM 6GB 这是一个Java企业项目 我必须在我的页面中显示订单相关信息 有三个表通过左连接连接在一起 Tables TV HD 389772 行 T
  • 如何在JPanel中设置背景图片

    你好 我使用 JPanel 作为我的框架的容器 然后我真的想在我的面板中使用背景图片 我真的需要帮助 这是我到目前为止的代码 这是更新 请检查这里是我的代码 import java awt import javax swing import
  • Spring @Cacheable 和 @Async 注解

    我需要缓存一些异步计算的结果 具体来说 为了克服这个问题 我尝试使用 Spring 4 3 缓存和异步计算功能 作为示例 我们采用以下代码 Service class AsyncService Async Cacheable users C
  • 如何解决 openapi-generator-maven-plugin 使用已弃用的类?

    我尝试将 openapi generator maven plugin 与 Spring Boot 2 4 3 一起使用 根据互联网上的一些示例 按照 openapi yaml 中制定的规范生成代码 但没有示例提供包含所需的完整依赖项 该插
  • Android S8+ 警告消息“不支持当前的显示尺寸设置,可能会出现意外行为”

    我在 Samsung S8 Android 7 中收到此警告消息 APP NAME 不支持当前的显示尺寸设置 可能会 行为出乎意料 它意味着什么以及如何删除它 谢谢 通过添加解决supports screens 机器人 xlargeScre
  • java XMLSerializer 避免复杂的空元素

    我有这个代码 DocumentBuilderFactory factory DocumentBuilderFactory newInstance DocumentBuilder builder factory newDocumentBuil
  • Java 正则表达式中的逻辑 AND

    是否可以在 Java Regex 中实现逻辑 AND 如果答案是肯定的 那么如何实现呢 正则表达式中的逻辑 AND 由一系列堆叠的先行断言组成 例如 foo bar glarch 将匹配包含所有三个 foo bar 和 glarch 的任何
  • Log4j2 ThreadContext 映射不适用于parallelStream()

    我有以下示例代码 public class Test static System setProperty isThreadContextMapInheritable true private static final Logger LOGG
  • Java 和/C++ 在多线程方面的差异

    我读过一些提示 多线程实现很大程度上取决于您正在使用的目标操作系统 操作系统最终提供了多线程能力 比如Linux有POSIX标准实现 而windows32有另一种方式 但我想知道编程语言水平的主要不同 C似乎为同步提供了更多选择 例如互斥锁

随机推荐

  • SpringBoot2.7.2 版本配置swagger3的方法及教程

    原因 对SpringBoot2 7 2版本 swagger2 x版本不再适用 所以就选择了swagger3版本 但是相较于swagger2版本 swagger3版本更加麻烦 具体教程如下 方法 第一步 引入依赖
  • 【计算机毕业设计】java ssm在线学习系统 在线学习平台

    毕设帮助 源码交流 技术解答 见文末 一 前言 以前 我们的在线学习主要是通过面对面的讲授 这样 有很多优势 教师可以与学生直接交流 但是也有许多不尽人意的地方 课堂在线学习很大程度上受到时间和空间的限制 浪费了在线学习资源同时对于学生的进
  • 【算法题目】Leetcode算法题思路:两数相加

    在LeetCode上刷了一题比较基础的算法题 一开始也能解出来 不过在解题过程中用了比较多的if判断 看起来代码比较差 经过思考和改进把原来的算法优化了 题目 给出两个 非空 的链表用来表示两个非负的整数 其中 它们各自的位数是按照 逆序
  • 数据结构(C#)-- 贪心算法解决背包问题

    using System using System Collections Generic using System Linq using System Text using System Data using System Collect
  • 计算机的基本组成及工作原理

    计算机的基本组成及工作原理 1 3 1 计算机系统的组成 计算机系统是由硬件系统和软件系统两大部分组成 这一节将分别介绍计算机硬件系统和软件系统 计算机硬件是构成计算机系统各功能部件的集合 是由电子 机械和光电元件组成的各种计算机部件和设备
  • pytorch: Pool 和 AdaptivePool 的区别和使用方法

    在 pytorch 中 池化层 Pooling 有两种操作方式 一种是手动设计 另一种是自适应池化 一 手动设计 池化层操作 一般有最大值 max 池化和均值 avg 池化 而根据尺寸又有一维 二维 三维 所以 手动设计的池化层有6种函数
  • GitHub加速教程

    转载 GitHub 加速教程 GitHub Hosts 仓库提供最新的GitHub hosts地址 你可以自行配置hosts 但是最佳实践是使用 SwitchHosts 管理你的 hosts 可以阅读文章 SwitchHosts 还能这样管
  • 在Java中尽量使用包装类Integer而不使用int

    1 在MySQL中没有给字段赋值默认为null 当你从数据库中查出来也是null 如果该字段在对应的Java代码中是int类型 null不能对应int类型 因为int代表的是基本数据类型 只能是基本的数字 2 实体类的属性你可以给它赋值也可
  • 三分钟教你学Git(二十一) - 复制连续多个提交

    有时候我们有一个分支A 里边包含了提交A1 gt A2 gt A3 gt A4 gt A5等 然后我们又有一个分支B 里边包含了提交A1 gt A2 gt B1 gt B2 gt B3 现在我们想把分支A中的A3 A4 A5版本复制到分支B
  • vue3项目实战+element-plus

    记录自己搭建前端项目的学习过程和开发过程 希望一起学习进步 采用Vue3 element plus axios vue router sass 目前刚开始是用到了这些 随着开发慢慢更新 npm是比较慢的 所以我用的是pnpm 安装指令 np
  • 播放raw下的音乐文件

    在开发的过程中 我们需要一些不需要用户改变的音乐或图像文件 这些文件存在安卓res raw文件夹中 下面就介绍如何播放res raw文件夹中的乐 1 在res目录下新建一个raw文件夹 2 界面上 xml文件中有一个按钮控件 当我们点击按钮
  • 迭代器模式c++实现

    参考书籍 Head First设计模式 需求场景 餐馆和咖啡馆的菜单分别是用数组和容器 vector 存储的 有一天餐馆和咖啡店合并了想要打印菜单 必须实现两个不同的遍历 数组用sizeof计算长度 用 取元素 容器用size取长度 用at
  • 第二阶段 归纳总结

    总结归纳 文章目录 总结归纳 计算机理论基础为 的童鞋 一二阶段是基础 三四阶段是应用 1 Linux操作系统使用 2 数据机构 3 IO网络编程 4 并发编程 5 正则表达式 6 Mysql数据库 7 git使用 8 项目综合 聊天室 f
  • 2023年十大无代码测试工具 每个测试人员都应该知道

    编码 跟很多技能一样 需要多年的实践才能熟练掌握 让测试人员编写一堆代码组成测试程序 修复程序中的错误 并在几周内完成大量工作很有难度 根据2019 2020年世界质量报告 在敏捷项目中应用自动化测试的最大的问题就是很多测试人员缺乏专业编程
  • 双因素方差分析 matlab,MATLAB的双因素有交互效应的方差分析

    二 MATLAB的双因素有交互效应的方差分析在两个因素的试验中 不但每一个因素单独对试验结果起作用 往往两个因素的不同水平组合还会产生一定的合作效应 在方差分析中称为交互效应 交互效应在对因素方差分析中 通常是当成一个新因素来处理 设因素A
  • Django学习

    创建项目 django admin startproject mysite 目录结构如下 查看帮助 终端页面输入以下命令 django admin help django admin help Type django admin help
  • 缺陷检测数据集

    缺陷检测数据集 东北大学钢材表面缺陷数据集 铁质缺陷 东北大学钢材表面缺陷数据集 简介 该数据集是东北大学宋克臣团队制作而成 是钢材表面缺陷数据集 共有1800张图片 包含六种类型共有六种缺陷 crazing inclusion patch
  • Vue中路由vue-router的使用

    基本使用 1 安装vue router 命令 npm i vue router 2 应用插件 import VueRouter from vue router Vue use VueRouter 3 编写router配置项 import V
  • python中判断 nan 的几种方式

    float NaN 判断 float NaN float NaN pandas中的 nan 判断 pd isnull df1 df1 是DataFrame对象 也可以是Series对象 pd isna 直接判断DataFrame某一列是否为
  • Springboot自定义ThreadPoolTaskExecutor线程池多线程并发执行异步方法

    1 背景 当前因为工作需求 要发送大量Http请求 经过实践遍历发送需要6小时才能发送完毕 如果单线程发送请求会导致主线程阻塞 就会存在以下问题 前端用户等待响应时间过长 无法进行下一步操作 不利于用户操作系统 响应时间过长超过Tomcat