SpringBoot-线程池ThreadPoolExecutor异步处理(包含拆分集合工具类)

2023-11-08

ThreadPoolExecutor VS ThreadPoolTaskExecutor

ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。

在这里插入图片描述

配置文件application.yml

# 异步线程配置 自定义使用参数
async:
	executor:
		thread:
			core_pool_size: 10
			max_pool_size:  100   # 配置最大线程数
     	    queue_capacity:  99988  # 配置队列大小
      		keep_alive_seconds:  20  #设置线程空闲等待时间秒s
      		name:
        		prefix: async-thread-  # 配置线程池中的线程的名称前缀

配置类

@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig{
	
	//自定义使用参数
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;   //配置核心线程数
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;    //配置最大线程数
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;

	/**
		1.自定义asyncServieExecutor线程池
	*/
	@Bean(name = "asyncServiceExecutor")
	public ThreadPoolTaskExecutor asyncServiceExecutor(){
		
		log.info("start asyncServiceExecutor......");

		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		//配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //设置线程空闲等待时间 s
        executor.setKeepAliveSeconds(keepAliveSeconds);
        //配置队列大小 设置任务等待队列的大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        //设置线程池内线程名称的前缀-------阿里编码规约推荐--方便出错后进行调试
        executor.setThreadNamePrefix(namePrefix);

		/**
			rejection-policy:当pool已经达到max size的时候,如何处理新任务
			CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
		*/
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

		//执行初始化
		executor.initialize();
		return executor;
	}

	/**
		公共线程池,利用系统availableProcessors线程数量进行计算
	*/
	@Bean(name="commonThreadPoolTaskExecutor")
	public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor(){
		
		ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
		
		// 返回可用处理器的Java虚拟机的数量
		int processNum = Runtime.getRuntime().availableProcessors();
		int corePoolSize = (int)(processNum / (1-0.2));
		int maxPoolSize = (int)(processNum / (1-0.5));
	
		pool.setCorePoolSize(corePoolSize); // 核心池大小
        pool.setMaxPoolSize(maxPoolSize); // 最大线程数
        pool.setQueueCapacity(maxPoolSize * 1000); // 队列程度
        pool.setThreadPriority(Thread.MAX_PRIORITY);
        pool.setDaemon(false);
        pool.setKeepAliveSeconds(300);// 线程空闲时间		
		
		return pool;
	}

	/**
		自定义defaultThreadPoolExecutor线程池
	*/
	@Bean(name="defaultThreadPoolExecutor",destroyMethod = "shutdown")
	public ThreadPoolExecutor systemCheckPoolExecutorService(){
		
		int maxNumPool=Runtime.getRuntime().availableProcessors();
		return new ThreadPoolExecutor(3,
                maxNumPool,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                //置线程名前缀,例如设置前缀为hutool-thread-,则线程名为hutool-thread-1之类。
                new ThreadFactoryBuilder().setNamePrefix("default-executor-thread-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
	}
}

异步线程业务类

//自定义asyncServiceExecutor线程池
@Override
@Async("asyncServiceExecutor")
public void executeAsync(List<Student> students,
                         StudentService studentService,
                         CountDownLatch countDownLatch){
	
	try{
		log.info("start executeAsync");
		//异步线程要做的事情
		studentService.saveBatch(students);
		log.info("end executeAsync");
	}finally{
		countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
	}
}

拆分集合工具类

public class SplitListUtils {
    /**
     * 功能描述:拆分集合
     * @param <T> 泛型对象
     * @MethodName: split
     * @MethodParam: [resList:需要拆分的集合, subListLength:每个子集合的元素个数]
     * @Return: java.util.List<java.util.List<T>>:返回拆分后的各个集合组成的列表
     * 代码里面用到了guava和common的结合工具类
     * @Author: yyalin
     * @CreateDate: 2022/5/6 14:44
     */
    public static <T> List<List<T>> split(List<T> resList, int subListLength) {
        if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
            return Lists.newArrayList();
        }
        List<List<T>> ret = Lists.newArrayList();
        int size = resList.size();
        if (size <= subListLength) {
            // 数据量不足 subListLength 指定的大小
            ret.add(resList);
        } else {
            int pre = size / subListLength;
            int last = size % subListLength;
            // 前面pre个集合,每个大小都是 subListLength 个元素
            for (int i = 0; i < pre; i++) {
                List<T> itemList = Lists.newArrayList();
                for (int j = 0; j < subListLength; j++) {
                    itemList.add(resList.get(i * subListLength + j));
                }
                ret.add(itemList);
            }
            // last的进行处理
            if (last > 0) {
                List<T> itemList = Lists.newArrayList();
                for (int i = 0; i < last; i++) {
                    itemList.add(resList.get(pre * subListLength + i));
                }
                ret.add(itemList);
            }
        }
        return ret;
    }

    /**
     * 功能描述:方法二:集合切割类,就是把一个大集合切割成多个指定条数的小集合,方便往数据库插入数据
     * 推荐使用
     * @MethodName: pagingList
     * @MethodParam:[resList:需要拆分的集合, subListLength:每个子集合的元素个数]
     * @Return: java.util.List<java.util.List<T>>:返回拆分后的各个集合组成的列表
     * @Author: yyalin
     * @CreateDate: 2022/5/6 15:15
     */
    public static <T> List<List<T>> pagingList(List<T> resList, int pageSize){
        //判断是否为空
        if (CollectionUtils.isEmpty(resList) || pageSize <= 0) {
            return Lists.newArrayList();
        }
        int length = resList.size();
        int num = (length+pageSize-1)/pageSize;
        List<List<T>> newList =  new ArrayList<>();
        for(int i=0;i<num;i++){
            int fromIndex = i*pageSize;
            int toIndex = (i+1)*pageSize<length?(i+1)*pageSize:length;
            newList.add(resList.subList(fromIndex,toIndex));
        }
        return newList;
    }

    // 运行测试代码 可以按顺序拆分为11个集合
    public static void main(String[] args) {
        //初始化数据
        List<String> list = Lists.newArrayList();
        int size = 19;
        for (int i = 0; i < size; i++) {
            list.add("hello-" + i);
        }
        // 大集合里面包含多个小集合
        List<List<String>> temps = pagingList(list, 100);
        int j = 0;
        // 对大集合里面的每一个小集合进行操作
        for (List<String> obj : temps) {
            System.out.println(String.format("row:%s -> size:%s,data:%s", ++j, obj.size(), obj));
        }
    }

}

造数据,进行多线程异步插入

public int batchInsertWay() throws Exception {
        log.info("开始批量操作.........");
        Random rand = new Random();
        List<Student> list = new ArrayList<>();
        //造100万条数据
        for (int i = 0; i < 1000003; i++) {
            Student student=new Student();
            student.setStudentName("大明:"+i);
            student.setAddr("上海:"+rand.nextInt(9) * 1000);
            student.setAge(rand.nextInt(1000));
            student.setPhone("134"+rand.nextInt(9) * 1000);
            list.add(student);
        }
        //2、开始多线程异步批量导入
        long startTime = System.currentTimeMillis(); // 开始时间
        //boolean a=studentService.batchInsert(list);
        List<List<Student>> list1=SplitListUtils.pagingList(list,100);  //拆分集合
        CountDownLatch countDownLatch = new CountDownLatch(list1.size());
        for (List<Student> list2 : list1) {
            asyncService.executeAsync(list2,studentService,countDownLatch);
        }
        try {
            countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
            long endTime = System.currentTimeMillis(); //结束时间
            log.info("一共耗时time: " + (endTime - startTime) / 1000 + " s");
            // 这样就可以在下面拿到所有线程执行完的集合结果
        } catch (Exception e) {
            log.error("阻塞异常:"+e.getMessage());
        }
        return list.size();

    }

在这里插入图片描述
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot-线程池ThreadPoolExecutor异步处理(包含拆分集合工具类) 的相关文章

随机推荐

  • org.apache.poi.poifs.filesystem.NotOLE2FileException: Invalid header signature;

    今天学习poi导入导出excel 然后报错valid header signature 经过排查是因为没有关闭流 workbook close 关闭之后就可以了
  • kafka常用命令汇总(亲测自用)

    文章目录 一 启动kafka 二 查看命令 三 创建topic 四 生产者 五 消费者 六 修改topic 七 删除topic 一 启动kafka kafka 2 13 3 3 1 zookeeper 3 4 14 2 13 3 3 1 前
  • 基础篇——Pycharm的安装与使用windows+ubuntu 初学者此篇够用

    简介 Pycharm是python编程过程中最为推荐的编辑调试软件之一 其使用简单 界面友好 也成了学习Python路上必须学会的软件之一 本篇教程简单介绍一下windows用户从安装到日常使用的基本功能 其他系统也可简单参考 软件安装 P
  • 嵌入式Linux学习笔记 1-14 异常与中断

    1 异常与中断的概念引入与处理流程 上图解释了何为中断何为异常 其中中断也是属于一种异常 引申拓展为ARM对异常 中断 的处理过程 1 初始化 1 设置中断源 让他可以产生中断 如某个按键可以产生中断的话 我们可以设置他的gpio引脚为中断
  • LeetCode刷题笔记--015. 三数之和

    题目描述 给定一个包含 n 个整数的数组 nums 判断 nums 中是否存在三个元素 a b c 使得 a b c 0 找出所有满足条件且不重复的三元组 注意 答案中不可以包含重复的三元组 例如 给定数组 nums 1 0 1 2 1 4
  • 【算法】蓝桥杯dfs深度优先搜索之凑算式总结

    本文 算法 蓝桥杯dfs深度优先搜索之凑算式总结 相关文章 算法 蓝桥杯dfs深度优先搜索之排列组合总结 算法 蓝桥杯dfs深度优先搜索之图连通总结 前言 曾几何时这个词现在用正适合不过了 曾几何时我还是对dfs算法一脸懵x的状态 虽说大二
  • Java中32位的最高位为1的二进制数如何转换成整数

    int类型的 3的32位表示为 11111111111111111111111111111101 将32位翻转的时候应该为 10111111111111111111111111111111 当时在LeetCode做这题的时候想的是用字符串翻
  • IDEA开发工具11---Python引入第三方包

    如要在工程文件中引入requests 但是本机上并没有安装这个包 在工程文件中输入import requests 然后Alt Enter 然后回车 IDEA会自动安装这个包
  • ESP8266 连接 MQTT

    ESP8266 连接 MQTT 主控芯片 MM32F2377 MB 039 WiFi 适配器 ESP8266 开发环境 IAR 7 80 4 MQTT 模拟服务器 MQTT fx MQTT MQTT is an OASIS standard
  • 解决windows 下使用 mingw编译器 调试时 无法跟进源码

    windows 下使用 mingw编译器 调试时 无法跟进源码 最近在公司使用QT 开发 官方在线下载的 安装的QT mingw 都是没有debug版本的 由于没有debug版本动态库 所以你调试的时候压根就无法跟进QT源代码里 那么找问题
  • 关于在windows下启动zkServer.cmd闪退的解决办法

    1 下载zookeeper注册中心 下载地址 http www apache org dyn closer cgi zookeeper 下载后解压即可 进入D apach zookeeper 3 4 5 bin 双击zkServer cmd
  • ES6 数组内对象去重

    在实际的项目当中不可避免的会遇到数组里面元素重复情况 下面将介绍几种ES6数组去重的方法 1 使用Set去重 const arr 张三 张三 三张三 let set new Set arr set 自带去重 Set 张三 三张三 conso
  • Ubuntu搭建Samba服务-学习记录

    文章目录 Ubuntu安装Samba流程 Samba配置文件 Samba添加账户 配置文件修改 Samba服务控制 设置开机自动启动 通过systemctl 启动服务 通过 rc local 启动 Windows访问 参考链接 当前文章仅用
  • 福禄克电缆检测仪MS2-100有哪些功能?

    现在的通信技术人员有很多问题需要处理 而不仅仅是电缆问题 在确定连接问题的原因之前 必须先排除可能存在的电缆和服务等问题 是否有电话电压 极性是什么 远端有以太网交换机吗 PoE 是否可用 福禄克电缆检测仪MS2 100可以确认这些问题 为
  • LaTeX:插入PDF出现版本警告

    LaTeX LaTeX LATE X 插入PDF出现版本警告 文章目录 LaTeX LaTeX LATE X 插入PDF出现版本警告 1 问题描述 2 解决
  • 微信封号被限制的几种原因及解决方法

    微信被限制了也不需要紧张 找到原因对应处理就行了 一 总结一下微信微信被限制登录的几种原因 1 频繁的违规操作 微信违规操作了 比方说频繁的添加微信好友 发布违规信息 使用第三方非法破解软件等 这些行为都属于微信明令禁止的行为 如果触犯了微
  • vim连接外接显示器后右侧无法选中的问题

    RT 解决办法 在 vimrc添加如下代码 if has mouse sgr set ttymouse sgr else set ttymouse xterm2 end 原文连接 https ifconfiger com articles
  • 4.jeston nano NX安装系统、pycharm

    笔者有幸通过项目一次入手一块jeston Xavier NX和jeston nano 随即开始研究安装系统和pycharm 其中系统换了4个镜像才安装成功 其实下载安装官方的就行 其他的包括店里的都不要用 1 安装系统 务必注意镜像要下对
  • 如何修改VsCode的背景图片

    步骤 第一步 准备一张图片 图片路径最好不要出现中文 第二步 在VsCode中安装插件 搜索 background 安装这个插件 第三步 这个插件安装成功之后 里面自带了一些背景 如果喜欢可以不用换 也可以根据需要自定义 找到 settin
  • SpringBoot-线程池ThreadPoolExecutor异步处理(包含拆分集合工具类)

    ThreadPoolExecutor VS ThreadPoolTaskExecutor ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理 配置文件application yml 异步线程配