1.CompletableFuture
1.1 Completable.supplyAsync
代码示例:
@Slf4j
public class FutureDemo {
public static void main(String[] args) {
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName());
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName());
System.out.println(getTime() + " 开始执行异步任务");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getTime() + " 异步任务3秒执行完成");
return "异步任务执行结果";
});
System.out.println(getTime() + " 主线程执行任务中");
String s = null;
try {
s = completableFuture.get(4, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(getTime() + " 执行异步任务超时");
//可以选择返回继续执行后面的或者抛异常终止
return;
// throw new RuntimeException(e);
}
System.out.println(getTime() + " " + s);
}
public static String getTime() {
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
return dateFormat.format(date);
}
}
输出:
2023-04-17 04:33:09 当前线程:main
2023-04-17 04:33:09 主线程执行任务中
2023-04-17 04:33:09 当前线程:ForkJoinPool.commonPool-worker-1
2023-04-17 04:33:09 开始执行异步任务
2023-04-17 04:33:12 异步任务3秒执行完成
2023-04-17 04:33:12 异步任务执行结果
简单分析一下,Completable使用的是ForkJoinPool线程池,Completable.supplyAsync方法返回异步执行结果,通过get方法获取,可以设置超时时间;
1.2 Completable.runAsync
代码示例
public static void main(String[] args) {
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName());
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName());
System.out.println(getTime() + " 开始执行异步任务");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getTime() + " 异步任务3秒执行完成");
});
System.out.println(getTime() + " 主线程执行任务中");
try {
Void task = completableFuture.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(getTime() + " 执行异步任务超时");
//可以选择返回继续执行后面的或者抛异常终止
return;
// throw new RuntimeException(e);
}
System.out.println(getTime() + " over");
}
输出:
2023-04-17 05:37:38 当前线程:main
2023-04-17 05:37:38 主线程执行任务中
2023-04-17 05:37:38 当前线程:ForkJoinPool.commonPool-worker-1
2023-04-17 05:37:38 开始执行异步任务
2023-04-17 05:37:41 异步任务3秒执行完成
2023-04-17 05:37:41 over
1.3 get方法
和supplyAsync不同的是没有返回值,有一个需要注意的点,不管是Completable.supplyAsync还是Completable.runAsync方法,如果没有调用get方法的话,异步任务可能不会执行完,尝试讲get方法注释得到下面的输出结果:
2023-04-17 05:39:48 当前线程:main
2023-04-17 05:39:48 主线程执行任务中
2023-04-17 05:39:48 over
2023-04-17 05:39:48 当前线程:ForkJoinPool.commonPool-worker-1
2023-04-17 05:39:48 开始执行异步任务
get方法会阻塞主线程,等待异步执行完成;
1.4 使用自定义线程池
或者通过自定义线程池的方式,Completable使用默认的线程池创建的线程是守护线程,守护线程会随着主线程的结束而结束,而使用自己创建的线程池,不是守护线程,所以主线程结束之后,可以继续执行;
public static void main(String[] args) {
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("service-thread-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10000), factory,
new ThreadPoolExecutor.AbortPolicy());
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName());
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName());
System.out.println(getTime() + " 开始执行异步任务");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getTime() + " 异步任务3秒执行完成");
}, executorService);
System.out.println(getTime() + " 主线程执行任务中");
// try {
// Void task = completableFuture.get(5, TimeUnit.SECONDS);
// } catch (InterruptedException | ExecutionException | TimeoutException e) {
// log.warn(getTime() + " 执行异步任务超时");
// //可以选择返回继续执行后面的或者抛异常终止
// return;
throw new RuntimeException(e);
// }
System.out.println(getTime() + " over");
}
输出:
2023-04-17 05:56:45 当前线程:main
2023-04-17 05:56:45 主线程执行任务中
2023-04-17 05:56:45 over
2023-04-17 05:56:45 当前线程:service-thread-0
2023-04-17 05:56:45 开始执行异步任务
2023-04-17 05:56:48 异步任务3秒执行完成
1.5 Completable.allof示例 等待异步线程执行统一返回
@Slf4j
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//定义线程池
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("service-thread-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10000), factory,
new ThreadPoolExecutor.AbortPolicy());
//预准备数据
List<Integer> lists = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
lists.add(i);
}
//数据转换格式
Function<Integer, CompletableFuture<String>> function = item -> CompletableFuture.supplyAsync(() -> {
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName() + " " + item.toString());
return item.toString();
}, executorService);
//进行数据转换
List<CompletableFuture<String>> list1 = lists.stream().map(function).collect(Collectors.toList());
//等待所有异步线程返回
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(list1.toArray(new CompletableFuture[0]));
//这里会阻塞主线程
voidCompletableFuture.get(2, TimeUnit.SECONDS);
list1.stream().map(i -> {
try {
String s = i.get();
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName() + " " + s + " over");
return s;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
executorService.shutdownNow();
}
public static String getTime() {
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
return dateFormat.format(date);
}
}
输出
2023-04-18 02:14:16 当前线程:service-thread-0 0
2023-04-18 02:14:16 当前线程:service-thread-2 2
2023-04-18 02:14:16 当前线程:service-thread-1 1
2023-04-18 02:14:16 当前线程:service-thread-0 3
2023-04-18 02:14:16 当前线程:service-thread-2 4
2023-04-18 02:14:16 当前线程:service-thread-0 6
2023-04-18 02:14:16 当前线程:service-thread-2 7
2023-04-18 02:14:16 当前线程:service-thread-0 8
2023-04-18 02:14:16 当前线程:service-thread-2 9
2023-04-18 02:14:16 当前线程:service-thread-1 5
2023-04-18 02:14:16 当前线程:main 0 over
2023-04-18 02:14:16 当前线程:main 1 over
2023-04-18 02:14:16 当前线程:main 2 over
2023-04-18 02:14:16 当前线程:main 3 over
2023-04-18 02:14:16 当前线程:main 4 over
2023-04-18 02:14:16 当前线程:main 5 over
2023-04-18 02:14:16 当前线程:main 6 over
2023-04-18 02:14:16 当前线程:main 7 over
2023-04-18 02:14:16 当前线程:main 8 over
2023-04-18 02:14:16 当前线程:main 9 over
如果不对主线程进行阻塞
@Slf4j
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//定义线程池
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("service-thread-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10000), factory,
new ThreadPoolExecutor.AbortPolicy());
//预准备数据
List<Integer> lists = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
lists.add(i);
}
//数据转换格式
Function<Integer, CompletableFuture<String>> function = item -> CompletableFuture.supplyAsync(() -> {
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName() + " " + item.toString());
try {
//进行睡眠
Thread.sleep(item * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return item.toString();
}, executorService);
//进行数据转换
List<CompletableFuture<String>> list1 = lists.stream().map(function).collect(Collectors.toList());
//等待所有异步线程返回
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(list1.toArray(new CompletableFuture[0]));
//这里会阻塞主线程
// voidCompletableFuture.get(2, TimeUnit.SECONDS);
list1.stream().map(i -> {
try {
String s = i.get();
System.out.println(getTime() + " 当前线程:" + Thread.currentThread().getName() + " " + s + " over");
return s;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
executorService.shutdownNow();
}
public static String getTime() {
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
return dateFormat.format(date);
}
}
输出
2023-04-18 02:47:15 当前线程:service-thread-0 0
2023-04-18 02:47:15 当前线程:service-thread-1 1
2023-04-18 02:47:15 当前线程:service-thread-2 2
2023-04-18 02:47:15 当前线程:service-thread-0 3
2023-04-18 02:47:15 当前线程:main 0 over
2023-04-18 02:47:16 当前线程:main 1 over
2023-04-18 02:47:16 当前线程:service-thread-1 4
2023-04-18 02:47:17 当前线程:service-thread-2 5
2023-04-18 02:47:17 当前线程:main 2 over
2023-04-18 02:47:18 当前线程:main 3 over
2023-04-18 02:47:18 当前线程:service-thread-0 6
2023-04-18 02:47:20 当前线程:main 4 over
2023-04-18 02:47:20 当前线程:service-thread-1 7
2023-04-18 02:47:22 当前线程:main 5 over
2023-04-18 02:47:22 当前线程:service-thread-2 8
2023-04-18 02:47:24 当前线程:service-thread-0 9
2023-04-18 02:47:24 当前线程:main 6 over
2023-04-18 02:47:27 当前线程:main 7 over
2023-04-18 02:47:30 当前线程:main 8 over
2023-04-18 02:47:33 当前线程:main 9 over
可见在获取异步执行结果后,主线程会执行,异步执行完返回一个主线程执行一个,直到所有的都执行完;
1.6 Completable.join
@Slf4j
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("service-thread-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10000), factory,
new ThreadPoolExecutor.AbortPolicy());
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
System.out.println(getTime() + Thread.currentThread().getName() + "开始执行异步任务11");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(getTime() + Thread.currentThread().getName() + "执行异步任务完成11");
}, executorService);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {
System.out.println(getTime() + Thread.currentThread().getName() + "开始执行异步任务22");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(getTime() + Thread.currentThread().getName() + "执行异步任务完成22");
}, executorService);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(f1, f2);
voidCompletableFuture.join();
System.out.println(getTime() + Thread.currentThread().getName() + "over");
executorService.shutdownNow();
}
public static String getTime() {
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
return dateFormat.format(date);
}
}
输出
2023-04-18 03:12:44service-thread-0开始执行异步任务11
2023-04-18 03:12:44service-thread-1开始执行异步任务22
2023-04-18 03:12:45service-thread-0执行异步任务完成11
2023-04-18 03:12:49service-thread-1执行异步任务完成22
2023-04-18 03:12:49mainover
和get方法差不多,只是抛出异常方式有区别,建议用get;
1.7 Completable.anyof 有任一结果返回就返回
@Slf4j
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//定义线程池
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("service-thread-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10000), factory,
new ThreadPoolExecutor.AbortPolicy());
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
System.out.println(getTime() + Thread.currentThread().getName() + "开始执行异步任务11");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(getTime() + Thread.currentThread().getName() + "执行异步任务完成11");
}, executorService);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {
System.out.println(getTime() + Thread.currentThread().getName() + "开始执行异步任务22");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(getTime() + Thread.currentThread().getName() + "执行异步任务完成22");
}, executorService);
CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(f1, f2);
objectCompletableFuture.get();
System.out.println(getTime() + Thread.currentThread().getName() + "over");
executorService.shutdownNow();
}
public static String getTime() {
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
return dateFormat.format(date);
}
}
输出
2023-04-18 03:32:36service-thread-0开始执行异步任务11
2023-04-18 03:32:36service-thread-1开始执行异步任务22
2023-04-18 03:32:37service-thread-0执行异步任务完成11
2023-04-18 03:32:37mainover
1.8 CompletableFuture.completedFuture 返回一个计算好的CompletableFuture
@Slf4j
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//定义线程池
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("service-thread-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10000), factory,
new ThreadPoolExecutor.AbortPolicy());
CompletableFuture<List<String>> listCompletableFuture = CompletableFuture.completedFuture(getList());
List<String> list = listCompletableFuture.get();
System.out.println(getTime() + " " + Thread.currentThread().getName() + " " + JSON.toJSON(list));
executorService.shutdownNow();
}
public static String getTime() {
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
return dateFormat.format(date);
}
public static List<String> getList() throws InterruptedException {
List<String> list = Lists.newArrayList();
for (int i = 0; i < 5; i++) {
list.add(i + "");
Thread.sleep(i * 1000);
System.out.println(getTime() + " " + Thread.currentThread().getName() + " " + i);
}
return list;
}
}
输出
2023-04-19 11:05:59 main 0
2023-04-19 11:06:00 main 1
2023-04-19 11:06:02 main 2
2023-04-19 11:06:05 main 3
2023-04-19 11:06:09 main 4
2023-04-19 11:06:09 main ["0","1","2","3","4"]
注意,这是在主线程中执行的;
2. @Async
2.1 简单举例
使用@Async注解开启异步任务,需要在主启动类上加@EnableAsync注解,先来一个简单的例子:
Controller层:
@RestController
@RequiredArgsConstructor
public class AsyncController {
@Autowired
private AsyncService asyncService;
@PostMapping("/async")
public String testAsync() {
return asyncService.testAsync();
}
}
Service层:
@Service
@Slf4j
@RequiredArgsConstructor
public class AsyncServiceImpl implements AsyncService {
@Autowired
private AsyncTask asyncTask;
@Override
public String testAsync() {
log.info(Thread.currentThread().getName() + " start test async");
Future<List<Integer>> listFuture = asyncTask.asyncJob();
List<Integer> list;
try {
//在这里会阻塞主线程直到所有的异步线程执行完返回结果
list = listFuture.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
log.info(JSON.toJSONString(list));
log.info(Thread.currentThread().getName() + " over test async");
return "ok";
}
}
自定义线程池,需要定义线程池名称"AsyncThreadPool":
@Configuration
@Slf4j
public class ThreadPoolConfig {
@Bean(name = "AsyncThreadPool")
public ExecutorService executorService() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-%d").build();
try {
return new ThreadPoolExecutor(
10, 20, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), threadFactory, new ThreadPoolExecutor.AbortPolicy());
} finally {
log.info("async thread pool build");
}
}
}
异步任务逻辑层:
@Component
@Slf4j
public class AsyncTask {
//使用系统默认线程池
// @Async
//使用自定义线程池
@Async("AsyncThreadPool")
public Future<List<Integer>> asyncJob() {
List<Integer> list = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info(Thread.currentThread().getName() + " async job " + i);
list.add(i);
}
return new AsyncResult<>(list);
}
}
执行结果:
使用@Async注解开启异步任务,使用自定义线程池,标注线程池名字,如果不适用自定义线程池,将会使用相同默认线程池,默认线程池的最大线程数和任务队列都是Integer.MAX,并发数过大会导致OOM;
下面是使用系统默认线程池:
2.2 不适用Future.get方法阻塞
不使用Future.get方法去阻塞的话,主线程会先执行完,这种适用于需要快速返回,举个例子,用户购买某一个应用,购买成功之后,马上给用户提示购买成功,而购买服务所需要的一些配置可以在购买成功之后再去执行,先给用户展示购买成功,再去配置,哪怕后面服务的一些配置出问题,也要提示用户购买成功,之后对异步执行过程中的配置异常做补偿处理;
2.3 多个异步任务的处理
@Service
@Slf4j
@RequiredArgsConstructor
public class AsyncServiceImpl implements AsyncService {
@Autowired
private AsyncTask asyncTask;
@Override
public String testAsync() {
log.info(Thread.currentThread().getName() + " start test async");
Future<List<Integer>> listFuture = asyncTask.asyncJob();
Future<List<Integer>> listFuture1 = asyncTask.asyncJobs();
List<Integer> list;
try {
list = listFuture1.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
log.info(JSON.toJSONString(list));
log.info(Thread.currentThread().getName() + " over test async");
return "ok";
}
}
从返回结果可以看出来,listFuture1.get会阻塞主线程,但是异步任务执行完成之后,不再阻塞主线程,主线程执行完毕,另一个异步任务继续执行;
要想异步任务在主线程之前执行完,可以通过调get方法:
List<Future<?>> list1 = Lists.newArrayList();
list1.add(listFuture);
list1.add(listFuture1);
list1.forEach(i->{
try {
i.get(10,TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
});
2.4 @Async失效的情况
@Service
@Slf4j
@RequiredArgsConstructor
public class AsyncServiceImpl implements AsyncService {
@Override
public String testAsync() {
log.info(Thread.currentThread().getName() + " start test async");
Future<List<Integer>> listFuture = asyncJob();
log.info(Thread.currentThread().getName() + " over test async");
return "ok";
}
@Async("AsyncThreadPool")
public Future<List<Integer>> asyncJob() {
List<Integer> list = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info(Thread.currentThread().getName() + " async job " + i);
list.add(i);
}
return new AsyncResult<>(list);
}
}
可见都是在主线程执行的
Spring 在扫描bean的时候会扫描方法上是否包含@Async注解,动态地生成一个子类(即proxy代理类),当这个有注解的方法被调用的时候,实际上是由代理类来调用的。如果这个有注解的方法是被同一个类中的其他方法调用的,那么该方法的调用并没有通过代理类,而是直接通过原来的那个 bean,所以就失效了。所以调用方与被调方不能在同一个类,主要是使用了动态代理,同一个类的时候直接调用,不是通过生成的动态代理类调用。一般将要异步执行的方法单独抽取成一个类。
还有就是注解@Async的方法不是public方法、注解@Async的返回值只能为void或者Future、注解@Async方法使用static修饰也会失效;
类中需要使用@Autowired或@Resource等注解自动注入,不能自己手动new对象、在Async 方法上标注@Transactional是没用的,但在Async 方法调用的方法上标注@Transactional 是有效的;