CompletableFuture
简介:
项目中一个统计的业务场景,使用原生的CompletableFuture异步多个任务查询mysql数据,少量请求无问题,但是测试过程中大量请求进来,线程没有设置超时时间,导致大量线程处于等待状态,接口响应缓慢。
因此需要在原生的CompletableFuture中封装,使用自定义线程池、设置超时时间保证接口稳定性。
使用方法:
工具类中主要封装了 supplyAsync()、runAsync()、allOf()这三个方法,目前我的项目中业务场景这三个方法比较常用。
将工具类引入项目中,CompletableFuture.supplyAsync() 可直接替换为FutureUtil.supplyAsync(),其他方法同理,替换类名即可,示例如下
FutureUtil.supplyAsync(() -> {
})
FutureUtil.runAsync(() -> {
})
FutureUtil.allOf(() -> {
})
默认超时时间为可在工具类中设置,可手动设置超时时间,示例如下:
FutureUtil.allOf(1000, TimeUnit.MILLISECONDS,() -> {
FutureUtil.runAsync(1000, TimeUnit.MILLISECONDS,() -> {
FutureUtil.supplyAsync(1000, TimeUnit.MILLISECONDS,() -> {
详细使用可自行查看工具类中源码,代码贴下面了👇👇👇
代码:
package com.wh.org.basecommon.utils.futures;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;
@Slf4j
public class FutureUtil {
private static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
private static final int TIMEOUT_VALUE = 1500;
private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
public static final class Delayer {
static final ScheduledThreadPoolExecutor delayer;
static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}
static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureScheduler");
return t;
}
}
}
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
AVALIABLE_PROCESSORS,
3 * AVALIABLE_PROCESSORS,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier){
return supplyAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,supplier);
}
public static <T> CompletableFuture<T> supplyAsync(long timeout, TimeUnit unit,Supplier<T> supplier){
return CompletableFuture.supplyAsync(supplier, threadPoolExecutor)
.applyToEither(timeoutAfter(timeout,unit), Function.identity())
.exceptionally(throwable -> {
throwable.printStackTrace();
log.error(throwable.getMessage());
return null;
});
}
public static CompletableFuture runAsync(Runnable runnable){
return runAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,runnable);
}
public static CompletableFuture runAsync(long timeout, TimeUnit unit,Runnable runnable){
return CompletableFuture.runAsync(runnable,threadPoolExecutor)
.applyToEither(timeoutAfter(timeout,unit), Function.identity())
.exceptionally(throwable -> {
throwable.printStackTrace();
log.error(throwable.getMessage());
return null;
});
}
public static CompletableFuture allOf(CompletableFuture... futures){
return allOf(TIMEOUT_VALUE,TIMEOUT_UNIT,futures);
}
public static CompletableFuture allOf(long timeout, TimeUnit unit,CompletableFuture... futures){
return CompletableFuture.allOf(futures)
.applyToEither(timeoutAfter(timeout,unit), Function.identity())
.exceptionally(throwable -> {
throwable.printStackTrace();
log.error(throwable.getMessage());
return null;
});
}
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<T>();
Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
return result;
}
public static <T> CompletableFuture<T> timeoutAfter() {
CompletableFuture<T> result = new CompletableFuture<T>();
Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), TIMEOUT_VALUE, TIMEOUT_UNIT);
return result;
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)