【JUC】CompletableFuture超时处理 配置线程池

2023-05-16

CompletableFuture

  • 简介
  • 使用方法
  • 代码

简介:

  项目中一个统计的业务场景,使用原生的CompletableFuture异步多个任务查询mysql数据,少量请求无问题,但是测试过程中大量请求进来,线程没有设置超时时间,导致大量线程处于等待状态,接口响应缓慢。
  因此需要在原生的CompletableFuture中封装,使用自定义线程池、设置超时时间保证接口稳定性。


使用方法:

  工具类中主要封装了 supplyAsync()、runAsync()、allOf()这三个方法,目前我的项目中业务场景这三个方法比较常用。
  将工具类引入项目中,CompletableFuture.supplyAsync() 可直接替换为FutureUtil.supplyAsync(),其他方法同理,替换类名即可,示例如下

// 有返回值的异步任务
FutureUtil.supplyAsync(() -> {
	// 逻辑代码
})

// 无返回值的异步任务
FutureUtil.runAsync(() -> {
	// 逻辑代码
})

// 统一等待异步任务执行完成
FutureUtil.allOf(() -> {
	// 逻辑代码
})

默认超时时间为可在工具类中设置,可手动设置超时时间,示例如下:

// 前两个参数分别为 超时时间、时间单位
FutureUtil.allOf(1000, TimeUnit.MILLISECONDS,() -> { // 异步任务 });

FutureUtil.runAsync(1000, TimeUnit.MILLISECONDS,() -> { // 逻辑代码 });
FutureUtil.supplyAsync(1000, TimeUnit.MILLISECONDS,() -> { // 逻辑代码 });

详细使用可自行查看工具类中源码,代码贴下面了👇👇👇


代码:

package com.wh.org.basecommon.utils.futures;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* 多任务处理工具类
* @Author WhHao
* @Date 2022/8/10 16:04
* @Return
*/
@Slf4j
public class FutureUtil {

    /**
     * cpu 核心数
     */
    private static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

    // 最大超时时间
    private static final int TIMEOUT_VALUE = 1500;
    // 时间单位
    private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;


    /**
     * Singleton delay scheduler, used only for starting and * cancelling tasks.
     */
    public static final class Delayer {

        static final ScheduledThreadPoolExecutor delayer;

        /**
         * 异常线程,不做请求处理,只抛出异常
         */
        static {
            delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
            delayer.setRemoveOnCancelPolicy(true);
        }

        static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
            return delayer.schedule(command, delay, unit);
        }

        static final class DaemonThreadFactory implements ThreadFactory {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("CompletableFutureScheduler");
                return t;
            }
        }
    }

    /**
     * 根据服务器cpu自定义线程池
     */
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            AVALIABLE_PROCESSORS,
            3 * AVALIABLE_PROCESSORS,
            3,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(20),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    /**
     * 有返回值的异步
     * @param supplier
     * @param <T>
     * @return
     */
    public static  <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier){
        return supplyAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,supplier);
    }

    /**
     * 有返回值的异步 - 可设置超时时间
     * @param timeout
     * @param unit
     * @param supplier
     * @param <T>
     * @return
     */
    public static  <T> CompletableFuture<T> supplyAsync(long timeout, TimeUnit unit,Supplier<T> supplier){
        return CompletableFuture.supplyAsync(supplier, threadPoolExecutor)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 无返回值的异步
     * @param runnable
     * @return
     */
    public static CompletableFuture runAsync(Runnable runnable){
        return runAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,runnable);
    }

    /**
     * 无返回值的异步 - 可设置超时时间
     * @param runnable
     * @return
     */
    public static CompletableFuture runAsync(long timeout, TimeUnit unit,Runnable runnable){
        return CompletableFuture.runAsync(runnable,threadPoolExecutor)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 统一处理异步结果
     * @param futures
     * @return
     */
    public static CompletableFuture allOf(CompletableFuture... futures){
        return allOf(TIMEOUT_VALUE,TIMEOUT_UNIT,futures);
    }

    /**
     * 统一处理异步结果 - 可设置超时时间
     * @param futures
     * @return
     */
    public static CompletableFuture allOf(long timeout, TimeUnit unit,CompletableFuture... futures){
        return CompletableFuture.allOf(futures)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 异步超时处理
     * @param timeout
     * @param unit
     * @param <T>
     * @return
     */
    public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
        CompletableFuture<T> result = new CompletableFuture<T>();
        // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
        Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
        return result;
    }

    public static <T> CompletableFuture<T> timeoutAfter() {
        CompletableFuture<T> result = new CompletableFuture<T>();
        // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
        Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), TIMEOUT_VALUE, TIMEOUT_UNIT);
        return result;
    }

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【JUC】CompletableFuture超时处理 配置线程池 的相关文章

随机推荐

  • STM32CubeMX学习教程之五:PWM实现呼吸灯效果

    完整源码下载 xff1a https github com simonliu009 STM32CubeMX PWM Output 软件 xff1a STM32CubeMX V4 25 0 System Workbench V2 4 固件库版
  • STM32CubeMX学习教程之十:硬件I2C读写AT24C02

    完整源码下载 xff1a https github com simonliu009 STM32CubeMX hardware I2C AT24C02 网上有流传已久一种说法 xff0c 就是STM的I2C有bug xff0c 不好用 确实很
  • Visual Studio Code 的 includePath 全局设置

    这几天折腾使用Visual Studio Code 编写 ESP8266 non os 代码 xff0c 基本设置都没问题 xff0c 但是就是自动补全折腾很久没弄好 在折腾了一圈插件之后 xff08 包括Auto Import C 43
  • 使用.gitignore忽略文件或者文件夹及其失效解决方法

    git如果需要忽略某个文件夹 xff0c 可以在初始化之后 xff0c 在仓库根目录下创建一个 gitignore文件 xff0c 添加需要忽略的文件和文件夹即可 我们也不需要从头写 gitignore文件 xff0c GitHub已经为我
  • python源代码文件加密

    由于python设计的初衷是开源的 xff0c 因此py文件是可以直接看到源码的 但开发商业软件 xff0c 代码加密保护就比较重要了 python编译后程序 pyc文件是py文件经过编译后生成的二进制文件 xff0c 因此可以发布pyc文
  • gnu binutils

    The GNU Binutils are a collection of binary tools The main ones are ld the GNU linker as the GNU assembler But they also
  • android wakelock

    1 android休眠控制锁 powermanager acquir 获取控制锁 powermanager release 释放控制锁 2 在powerManagerService中 xff0c 有检测当前framework层中的wakel
  • 产品经理常用术语

    产品经理常用术语 长尾理论 网络时代兴起的一种新理论 xff0c 由于成本和效率的因素 xff0c 当商品储存流通展示的场地和渠道足够宽广 xff0c 商品生产成本急剧下降以至于个人都可以进行生产 xff0c 并且商品的销售成本急剧降低时
  • 传统CD车机面板操作说明

    转自http bbs hifidiy net thread 675597 1 1 html 一般CD车机带USB SD卡的面板操作说明 1 1 控制面板外观图 2 0 基本 共同 功能操作说明 2 1 开启和关闭电源 静音开关 按 钮为开启
  • 汽车总线系统通信协议

    天合汽车零部件 xff08 上海 xff09 有限公司 上海交通大学区域光纤通信网与新型光通信系统国家重点实验室 xff08 上网时间 xff1a 2006 11 xff09 摘要 xff1a 本文主要针对汽车电子控制系统和车载多媒体系统
  • 算法——连续性后处理(把26邻域连续的变成6邻域连续的)

    文章目录 1 概念 1 1 6邻域连续 1 2 18邻域连续 1 3 26邻域连续 1 4 总结 2 目标 3 严格一点的 3 1 原理描述 3 1 1 18邻域连续补充 3 1 2 26邻域连续补充 3 2 代码实现 C 4 宽松一点的
  • RedHat Linux下安装JDK1.7报错Permission denied

    在RedHat Linux5 中安装JDK1 7时 xff0c 当我解压jdk xff0c 并且配置好了环境变量 xff0c 测试的时候 xff0c 报以下错误 root 64 jingfeng01 java version Error d
  • 几种压缩算法的压缩和速度比较

    Quick Benchmark Gzip vs Bzip2 vs LZMA vs XZ vs LZ4 vs LZO EDIT Add zstd Contents hide 1 Selected archives2 Test conditio
  • DDR低功耗模式

    DDR规格 xff1a DDR工作状态图 xff1a DDR 刷新描述 xff1a 电特性 xff1a 工作模式简介 xff1a 1 1 自刷新模式 xff08 Self Refresh Mode xff09 DDR4 SDRAM中自刷新超
  • 嵌入式Linux的低功耗策略

    引 言 由于Linux系统具备嵌入式操作系统需要的很多特色 xff0c 如适应于多种CPU和多种硬件平台 性能稳定 可裁剪性很好 源码开放 研发和使用简单等 现在 xff0c 基于Linux应用的嵌入式设备日益增多 xff0c Linux正
  • libevent实现的HTTP Server

    在使用C语言编码时 有时候需要实现一个HTTP接口 我们可以选择使用libevent库来实现 以下代码演示了使用libevent 并同时支持多线程处理HTTP的请求 头文件 引入的头文件 span class token macro pro
  • Python爬虫完整案例 - 爬取百度百科词条信息

    概述 一个完整的爬虫 xff0c 一般由以下5个组件构成 1 URL管理器 负责维护待爬取URL队列 和已爬取URL队列 xff0c 必须拥有去重功能 2 HTML下载器 负责根据调度器从URL管理器中取出的url xff0c 下载html
  • android apk 签名(平台和普通签名)

    因为做了太多的终端项目 xff0c 客户总会有自己的apk提供 xff0c 这时候各种签名问题就来了 xff0c 最近整理了一下相关知识 xff0c 分享给大家 签名的用处 xff1a 1 应用程序升级 xff1a 如果你希望用户无缝升级到
  • scikit-learn介绍

    在机器学习和数据挖掘的应用中 xff0c scikit learn是一个功能强大的python包 在数据量不是过大的情况下 xff0c 可以解决大部分问题 学习使用scikit learn的过程中 xff0c 我自己也在补充着机器学习和数据
  • 【JUC】CompletableFuture超时处理 配置线程池

    CompletableFuture 简介使用方法代码 简介 xff1a 项目中一个统计的业务场景 xff0c 使用原生的CompletableFuture异步多个任务查询mysql数据 xff0c 少量请求无问题 xff0c 但是测试过程中