JUC 九. CompletableFuture

2023-11-06

一. CompletableFuture

  1. 在学习FutureTask时发现调用get()获取任务结果中存在的问题,引出CompletableFuture
  2. CompletableFuture 是个类实现了Future接口与CompletionStage两个接口,其中CompletionStage表示异步计算过程中的某一个阶段,一个阶段完成以后可能会触发的另外一个阶段,一个阶段的执行可能是被另外一个阶段完成或多个阶段一起触发的
    在这里插入图片描述
  3. 核心静态方法
  1. runAsync(): 无输入,无返回值
  2. supplyAsync(): 无输入,有返回值
  1. runAsync()简单示例
  1. runAsync(Runnable runnable) 线程方式,该方式底层默认会提供一个线程池
  2. runAsync(Runnable runnable,Executor executor),线程池方式
import java.util.concurrent.*;
public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.runAsync(Runnable runnable)
        CompletableFuture<Void> c = CompletableFuture.runAsync(() -> {
            //调用getName()方法会发现,在执行runAsync()时,如果不传递线程池,
            //底层会提供一个线程池ForkJoinPool.commonPool-worker-1
            System.out.println("异步线程执行 ThreadName:" + Thread.currentThread().getName());
        });

        //2.runAsync(Runnable runnable,Executor executor)
        //2.1新建一个线程池: 核心线程数1, 最大线程数20, 空闲线程存活时间20秒,任务队列50,拒绝策略: AbortPolicy当超过时会抛出异常
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
                20,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(50),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        //2.2通过CompletableFuture异步执行任务,并使用线程池
        CompletableFuture<Void> c2 = CompletableFuture.runAsync(() -> {
            System.out.println("异步线程执行 带线程池方式 ThreadName:" + Thread.currentThread().getName());
        }, executor);

        //3.runAsync()方法没有返回值,当调用get方法时返回null
        System.out.println(c.get());
        System.out.println(c2.get());
		
		//停止线程池运行
        executor.shutdown();
    }
}
  1. supplyAsync() 简单示例(此处的get与Future中的get相同还是还是会阻塞的)
import java.util.concurrent.*;
public class SupplyAsyncTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.supplyAsync(Supplier<U> supplier)
        CompletableFuture<String> c = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务执行 带返回值模式");
            //手动休眠,模拟任务执行消耗时间
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "异步任务执行完毕返回结果数据";
        });
        //2.获取异步执行结果
        System.out.println(c.get());

        //3.supplyAsync(Runnable runnable,Executor executor)
        //3.1新建一个线程池: 核心线程数1, 最大线程数20, 空闲线程存活时间20秒,任务队列50,拒绝策略: AbortPolicy当超过时会抛出异常
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
                20,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(50),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        //4.调用执行
        CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务执行 带返回值, 自定义线程池模式");
            //手动休眠,模拟任务执行消耗时间
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "异步任务执行完毕返回结果数据";
        }, executor);

        //5.获取异步执行结果
        System.out.println(c2.get());

        //停止线程池运行
        executor.shutdown();
    }
}
  1. get() 或 join()方法 阻塞获取结果数据示例2
	public static void main(String[] args) throws InterruptedException {

        String result = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //返回结果数据
            return "111数据";
        }).join(); //通过join或get方法获取异步任务结果数据,由于get需要处理异常,直接使用join
        System.out.println(result);
        System.out.println("主线程工作完毕======>");

        TimeUnit.SECONDS.sleep(5);
    }

计算结果完成时回调

  1. 上方 supplyAsync() 的简单示例中调用get()方法获取任务执行结果数据发现还是阻塞的,下方出一个非阻塞示例, 对下方代码解释
  2. whenComplete()/exceptionally(): CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法(其中whenComplete与whenCompleteAsync区别: 当前线程执行与交给异步线程或线程池异步执行,下方示例中获取异步任务,不会阻塞主线程的执行)
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
	public static void main(String[] args) throws InterruptedException, ExecutionException {
        //1.通过CompletableFuture调用supplyAsync()异步执行任务,返回结果数据
        CompletableFuture<String> c = CompletableFuture.supplyAsync(() -> {
            //手动休眠,模拟任务执行消耗时间
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //返回结果数据
            return "111数据";
        }).whenComplete((v, e) -> {
            //2.whenComplete当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action
            if (null == e) {
                System.out.println("supplyAsync执行返回数据result=: " + v);
            }
        }).exceptionally(e -> {
            //3.当发生异常时可以执行特定的Action
            System.out.println(e.getMessage());
            return null;
        });

        //4.执行是先打印出该语句,也就是说在whenComplete中获取supplyAsync 或 thenApply
        //或 thenApplyAsync异步任务中的执行结果不会影响到主线程的执行
        System.out.println("主线程工作完毕======>");
        //5.只要不调用get()方法就不回阻塞
        c.get()
        c.joio() //与get方法功能相同都会阻塞获取数据,不同的是join不需要捕获或抛出

        //注意点,测试时主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻结束
        TimeUnit.SECONDS.sleep(5);
    }

一个线程的执行依赖另一个线程

  1. 假设a线程执行返回执行结果数据,b线程执行,需要a线程的结果数据才能执行
  2. 当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class SupplyAsyncTest2 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //1.通过CompletableFuture调用supplyAsync()异步执行任务,返回结果数据
        CompletableFuture<String> c = CompletableFuture.supplyAsync(() -> {
            //手动休眠,模拟任务执行消耗时间
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //返回结果数据
            return "111数据";
        }).thenApply(f -> {
            //2.当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化
            //通过调用thenApply()同步执行第二个带返回结果任务,
            //第二个任务的执行需要第一个任务返回的结果数据
            //此处的f就是supplyAsync()执行完毕后返回的数据
           

            //返回结果数据
            return f + "||222数据then";
        }).thenApplyAsync(g -> {
            //3.通过调用thenApplyAsync()异步执行第三个任务,第三个任务
            //的执行需要上面任务执行完毕返回的数据,也就是g

            //返回数据
            return g + "||333数据thenAsync";
        }).whenComplete((v, e) -> {
            //4.whenComplete当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action
            if (null == e) {
                System.out.println("supplyAsync与thenApply执行完毕返回数据result=: " + v);
            }
        }).exceptionally(e -> {
            //5.当发生异常时可以执行特定的Action
            System.out.println(e.getMessage());
            return null;
        });

        //6.执行是先打印出该语句,也就是说在whenComplete中获取supplyAsync 或 thenApply
        //或 thenApplyAsync异步任务中的执行结果不会影响到主线程的执行
        System.out.println("主线程工作完毕======>");

        //注意点,测试时主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻结束
        TimeUnit.SECONDS.sleep(5);
    }
}

二. 根据案例再次了解 CompletableFuture.supplyAsync()

  1. 需求: 对数据库数据进行指定处理,例如过滤,清洗,格式化返回等等
  2. 代码(代码中有解释为什么CompletableFuture.supplyAsync() 会快,异步多线程)
import lombok.Builder;
import lombok.Data;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class PriceController {

    /**
     * 生成假数据
     * @return
     */
    public static List<RoomPriceInfo> getPrice() {
        List<RoomPriceInfo> priceList = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            double price = Math.random();
            BigDecimal b = new BigDecimal(price);
            b.setScale(2, BigDecimal.ROUND_HALF_UP);
            RoomPriceInfo priceInfo = RoomPriceInfo
                    .builder()
                    .memberType("房型A" + i)
                    .roomPrice(b)
                    .build();
            priceList.add(priceInfo);
        }
        return priceList;
    }

    /**
     * 不使用 CompletableFuture 模式
     * @param priceList
     * @return
     */
    public static List<String> formatPrices(List<RoomPriceInfo> priceList) {
        return priceList.stream()
                .map(price -> {
                    return String.format("%s price is %.2f", price.getMemberType(), price.getRoomPrice());
                })
                .collect(Collectors.toList());
    }

    /**
     * 使用 CompletableFuture.supplyAsync 模式
     * 该方法中有调两次map()方法进行类型转换
     * 1.priceList转换为stream流后调用map方法,在map方法中通过CompletableFuture.supplyAsync()异步任务方式去处理数据
     * 后续调用collect(Collectors.toList())转换为集合实际集合泛型的类型是<CompletableFuture>类型,可以理解为开启了多个任务去处理数据
     * 2.在上步骤中拿到返回的实际是List<CompletableFuture>数据后再次转换为stream流,map类型转换获取每个CompletableFuture任务结果数据
     * 3.为什么比基础模式下快,因为第一个map中通过CompletableFuture.supplyAsync使用多线程异步开启多个任务多次去处理数据,然后通过多个任务Future获取执行结果,并行所以快
     *
     * @param priceList
     * @return
     */
    public static List<String> taskPrices(List<RoomPriceInfo> priceList) {
        return priceList.stream()
                .map(price -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", price.getMemberType(), price.getRoomPrice())))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

@Builder
@Data
class RoomPriceInfo {
    private String memberType;
    private BigDecimal roomPrice;
}
  1. 运行测试(发现使用了CompletableFuture.supplyAsync只需要6,不使用的需要68)
 	/**
     * 运行测试
     * @param args
     */
    public static void main(String[] args) {
        //1.模拟数据库生成假数据
        List<RoomPriceInfo> priceList = getPrice();

        //2.基础模式下处理数据并计算执行时间
        long l = System.currentTimeMillis();
        List<String> result = formatPrices(priceList);
        System.out.println(result);
        System.out.println("formatPrices耗时: " + (System.currentTimeMillis() - l));

        //3.CompletableFuture.supplyAsync模式处理数据并计算执行时间
        long l2 = System.currentTimeMillis();
        List<String> result2 = taskPrices(priceList);
        System.out.println(result2);
        System.out.println("taskPrices耗时: " + (System.currentTimeMillis() - l2));
    }
}

在这里插入图片描述

四. CompletableFuture 常用api简介

  1. 参考博客
  2. 上面我们学习了
  1. runAsync() 无返回值
  2. supplyAsync() 有返回值
  3. whenComplete() / whenCompleteAsync() / exceptionally() 计算结果完成,或者抛出异常的时候,可以执行特定的Action
  4. thenApply() / thenApplyAsync() 当一个线程依赖另一个线程时,使用该方法来把这两个线程串行化
  1. CompletableFuture 接口分类
  1. 获取结果和触发计算
  2. 对计算结果进行处理
  3. 对计算结果进行消费
  4. 对计算结果进行选用
  5. 对计算结果进行合并
  6. 任务之间的顺序执行任务编排

获取结果和触发计算

  1. api
  1. get(): 阻塞获取结果数据
  2. get(long timeout, TimeUnit unit): 获取数据,指定阻塞等待时间,指定时间内获取不到则不再获取
  3. getNow(T valueifAbsent): 获取结果,若当前没计算完毕,则给一个默认的结果
  4. join(): 阻塞获取结果与get()功能相同,但是不用处理异常
  5. boolean complete(T value): 是否打断get()方法,试get方法返回当前"T value"数据与getNow()有点类似,注意点complete方法返回的是boolean值,与get()配合使用,先执行complete()方法去打断,然后执行get(),假设在打断前结果已经计算完毕get方法返回实际数据,否则返回的就是complete()中传递的数据
  1. 示例
	//1.supplyAsync(Supplier<U> supplier)
        CompletableFuture<String> c = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务执行 带返回值模式");
            //手动休眠,模拟任务执行消耗时间
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "异步任务执行完毕返回结果数据";
        });
        //2.获取异步执行结果
        System.out.println(c.get());
        //指定等待时间
        System.out.println(c.get(2L, TimeUnit.SECONDS));
        //若未计算完毕,则返回传递的兜底数据"AAA"否则返回实际数据
        System.out.println(c.getNow("AAAA"));
        //打断获取结果的执行,若结果已经计算完毕返回false,get方法返回实际数据
        //若没计算完毕返回true,get方法返回"BBBB"
        boolean b = c.complete("BBBB");
        c.get();
        System.out.println();

对计算结果进行处理

  1. api
  1. thenApply() / thenApplyAsync(): 当一个线程依赖另一个线程时,使用该方法来把这两个线程串行化,也可理解为拿到结果后进行后续处理操作
  2. handle相关 : 执行任务完成时对结果的处理
  1. 解释 handle 方法和 thenApply 方法处理方式基本一样,不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行(在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作)
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
  1. 查看thenApply()存在的问题: 在第一个thenApply中出现了异常,此时整个方法的逻辑是直接跳到whenComplete()–>exceptionally()去执行,假设实际需求中第一个thenApply中可能会出现某种可预知异常,当出现指定异常后,我们要在第二个thenApply中进行其他操作,最终执行到whenComplete(), 或者当前方法中如果没有whenComplete或exceptionally怎么办
	public static void test() {
        CompletableFuture<Integer> c = CompletableFuture.supplyAsync(() -> {
            return 1;
        }).thenApply(r -> {
            //第一个thenApply中发生异常
            int n = 10 / 0;
            return r + 2;
        }).thenApply(i -> {
            return i + 3;
        }).whenComplete((v, e) -> {
            if (null == e) {
                System.out.println(v);
            }else {
                System.out.println(e.getMessage());
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
    }

在这里插入图片描述
4. handle()示例,该方法可以输入两个参数(请求参数,异常数据), 注意点: 上层handle中的异常只能被第一个下层接收到,如果第一个下层的handle接收到异常后给处理掉了,后续的handle或whenComplete,或exceptionally中都将捕获不到这个异常,可以理解为try-fanlly

	public static void test() {
        CompletableFuture<Integer> c = CompletableFuture.supplyAsync(() -> {
            return 1;
        }).handle((r, e) -> {
            int n = 10 / 0;
            return r + 2;
        }).handle((r, e) -> {
        	//此处能拿到上层的异常,但是如果当前handle中没有再往外抛异常,则后续的handle
        	//或whenComplete,或exceptionally都将接收不到异常
            if (e != null) {
                System.out.println("上层handle任务中发生异常");
                return 100;
            }
            return r + 2;
        }).handle((i, e) -> {
            return i + 3;
        }).whenComplete((v, e) -> {
            if (null == e) {
                System.out.println(v);
            } else {
                System.out.println(e.getMessage());
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
    }

对计算结果进行消费

  1. thenAccept(): 与 thenApply 不同的是,thenApply有返回值,而thenAccept 没有返回值
  2. 示例
	public static void test() {
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }).thenAccept(r -> {
            System.out.println("接收数据r: " + r);
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return null;
        });
    }

对计算结果进行合并

  1. thenCombine(): 两个 CompletionStage 的任务都执行完成后,把两个任务结果一块交给 thenCombine 来处理,并返回数据
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
  1. thenAcceptBoth(): 两个 CompletionStage 的任务都执行完成后,把两个任务结果一块交给 thenAcceptBoth来处理,无返回值
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);
  1. 示例
	private static void thenCombineTest(){
		//1.thenCombine()有返回值
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            //任务1返回数据
            return 3;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            //任务2返回数据
            return 2;
        }), (r1, r2) -> {
            //r1是第一个任务执行结果,拿到结果后会等待第二个任务执行完毕
            //r2是第二个任务执行结果,第一个任务结果与第二个任务结果都拿到后
            //将两个任务结果一块处理
            return r1 * r1;
        });

        //打印结果为6
        System.out.println(completableFuture.join());
    }

	//2.thenAcceptBoth():消费,无返回值
	 CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
            //任务1返回数据
            return 3;
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            //任务2返回数据
            return 2;
        }), (r1, r2) -> {
            //r1是第一个任务执行结果,拿到结果后会等待第二个任务执行完毕
            //r2是第二个任务执行结果,第一个任务结果与第二个任务结果都拿到后
            //将两个任务结果一块处理
            System.out.println(r1 * r1);
        });

        //打印结果为6
        System.out.println(completableFuture.join());

任务之间的顺序执行任务编排

顺序执行

//1.任务a执行完,执行b任务,并且b任务不需要a的结果(无入参,无返回值)
 CompletableFuture<Void> thenRun(Runnable action)
 //2.任务a执行完执行b任务,b需要a的结果,但是b无返回值(有输入,无返回)
 CompletableFuture<Void> thenAccept(Consumer<? super T> action)
 //3.任务a执行完执行b任务,b需要a的执行结果,并且b有返回值(有输入,有返回)
 <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
 //4.允许对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作
 <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

根据响应速度选择任务结果

  1. applyToEither(): 连接任务,谁先返回结果就用谁,有返回值
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
  1. acceptEither (): 连接任务,谁先返回结果就用谁,无返回值
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
  1. 示例
 	/**
     * 通过applyToEither连接发起的任务
     * 哪个任务先执行完毕获取到结果,就使用哪个, 例如此处
     * 第一步中发起的任务内部手动休眠了2秒,
     * 最终r=2,使用的是第二步发起的任务执行得到的结果
     */
    public static void test() {
        //1.通过CompletableFuture.supplyAsync异步执行任务并返回结果数据
        CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> {
            //2.手动休眠模拟业务逻辑执行耗时
            try {
                TimeUnit.SECONDS.sleep(3L);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return 1;
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            //3.通过applyToEither()再次执行一个CompletableFuture.supplyAsync异步任务
            return 2;

        }), r -> {
            //4.返回最终结果r
            return r;
        });

        System.out.println("result: " + result.join());
    }

runAfterEither()

  1. 解释: 两个CompletionStage,任何一个完成了都会执行下一步的操作
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

runAfterBoth()

  1. 解释: 两个CompletionStage,都完成了计算才会执行下一步的操作
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

五. CompletableFuture 底层原理及总结

  1. 首先复习一下线程有哪几种创建方式
  1. 继承Thread类,重新run方法,调用start()启动线程
  2. 实现Runnable接口,实现run()方法
  3. 实现Callable接口,实现call()方法
  4. 线程池方式Executor 的工具类可以创建三种类型的普通线程池: FixThreadPool(int n); 固定大小的线程池, SingleThreadPoolExecutor :单线程池, CashedThreadPool(); 缓存线程池,
  1. FutureTask原理: 自己理解的是跟线程通讯有点像,有两个线程A与B,假设B线程执行完毕后有返回结果,B线程中添加用来存储结果的属性,添加用来标识是否存储完毕的属性,run方法执行完毕后要对存储结果数据的属性赋值,修改标识为完毕状态,调用notify()方法获取锁池中休眠的线程,提供获取执行结果数据的方法,方法中首先判断是否添加完毕,如果没有则调用wait() 方法,进入释放锁并进入等待状态
  2. FutureTask缺点, 如何解决这个缺点
  1. 也不能说是缺点: 在FutureTask任务执行后,我们可以调用get()方法获取任务执行结果,但是get()方法是阻塞的,假设在调用get方法后,FutureTask中的任务没有执行完毕,则get就一直阻塞等待到任务执行完毕返回结果值才能继续执行
  2. 解决get()方法阻塞问题,使用 “get(long timeout, TimeUnit unit)”, 指定获取结果时的等待时间,指定时间的获取不到则抛出超时异常, 或者使用futureTask调用isDone()查询是否获取到了结果,然后再执行get方法
  1. CompletableFuture API分类
  1. 获取结果和触发计算
  1. get(): 阻塞获取结果数据
  2. get(long timeout, TimeUnit unit): 获取数据,指定阻塞等待时间,指定时间内获取不到则不再获取
  3. getNow(T valueifAbsent): 获取结果,若当前没计算完毕,则给一个默认的结果
  4. join(): 阻塞获取结果与get()功能相同,但是不用处理异常
  5. boolean complete(T value): 是否打断get()方法,试get方法返回当前"T value"数据与getNow()有点类似,注意点complete方法返回的是boolean值,与get()配合使用,先执行complete()方法去打断,然后执行get(),假设在打断前结果已经计算完毕get方法返回实际数据,否则返回的就是complete()中传递的数据
  1. 对计算结果进行处理
  1. thenApply() / thenApplyAsync(): 当一个线程依赖另一个线程时,使用该方法来把这两个线程串行化,也可理解为拿到结果后进行后续处理操作
  2. handle相关 : 执行任务完成时对结果的处理
  1. 对计算结果进行消费: thenAccept(): 与 thenApply 不同的是,thenApply有返回值,而thenAccept 没有返回值
  2. 对计算结果进行选用
  3. 对计算结果进行合并:
  1. thenCombine(): 两个 CompletionStage 的任务都执行完成后,把两个任务结果一块交给 thenCombine 来处理,并返回数据
  2. thenAcceptBoth(): 两个 CompletionStage 的任务都执行完成后,把两个任务结果一块交给 thenAcceptBoth来处理,无返回值
  1. 任务之间的顺序执行任务编排
  1. applyToEither(): 连接任务,谁先返回结果就用谁,有返回值
  2. acceptEither (): 连接任务,谁先返回结果就用谁,无返回值
  3. runAfterEither(): 两个CompletionStage,任何一个完成了都会执行下一步的操作
  4. runAfterBoth(): 两个CompletionStage,都完成了计算才会执行下一步的操作

底层原理

  1. ompletableFuture分别实现了Future和CompletionStage两个接口,通过CompletionStage给CompletableFuture提供接口,来实现非常复杂的异步计算工作

工作示例

	/**
     * 获取结账账单
     *
     * @param billInfoReqDTO
     * @return
     */
    public BizResponseData<BillInfoRespDTO> getOrderBill(BillInfoReqDTO billInfoReqDTO) throws CheckException {
        //BizStatusCodeEnum.BILL_GETORDER_THIRD_PARTY_BACK_ERROR
        //BizStatusCodeEnum.BILL_GETORDER_THIRD_PARTY_SERVICE_ERROR
        //BizStatusCodeEnum.BILL_GETORDER_INSIDE_ERROR
        List<BillItemDTO> accntList = billInfoReqDTO.getList();
        if (CollectionUtils.isEmpty(accntList)) {
            return BizResponseData.error(BizStatusCodeEnum.BILL_GETORDER_INSIDE_ERROR);
        }
        //1.获取并校验酒店配置信息
        PmsHotelInfoDO hotelInfo = this.checkPmsHotelInfo(billInfoReqDTO.getHotelCode(), BizStatusCodeEnum.BILL_GETORDER_INSIDE_ERROR);
        /*List<CompletableFuture<BizResponseData<BillInfoRespDTO>>> list = accntList.stream().map(accnt->
                CompletableFuture.supplyAsync(() -> {
                    BillPayReq billPayReq = BillPayReq.builder().acctnum(Integer.parseInt(accnt.getAccnt())).build();
                    BizResponseData<JSONObject> exchange = templateService.exchange(hotelInfo, PMSApiPathEnum.GET_ACCNUMT_BY_RM, billPayReq, billPayReq, BizStatusCodeEnum.BILL_GETORDER_THIRD_PARTY_BACK_ERROR);
                    if (!BizStatusCodeEnum.SUCCESS.getCode().equals(exchange.getErrorcode())) {
                        return exchange;
                    }
                    Optional.ofNullable(exchange)
                            .map(resp -> (JSONObject) exchange.getData())
                            .map(data -> JSON.toJavaObject(exchange.getData(), BillPayResp.class))


                }).whenComplete((result, err) -> {
                    BillInfoRespDTO resp = new BillInfoRespDTO();
                    BizResponseData.ok(resp);
                }).exceptionally(e -> {

                    System.out.println(e.getMessage());
                    return BizResponseData.ok();
                })
        ).collect(Collectors.toList());
        CompletableFuture.allOf(list).join();*/

        List<BizResponseData> list = accntList.stream()
                .map(accnt -> CompletableFuture.supplyAsync(() -> {
                    BillPayReq billPayReq = BillPayReq.builder().acctnum(Integer.parseInt(accnt.getAccnt())).build();
                    return templateService.exchange(hotelInfo, PMSApiPathEnum.GET_ACCNUMT_BY_RM, billPayReq, billPayReq, BizStatusCodeEnum.BILL_GETORDER_THIRD_PARTY_BACK_ERROR);
                }))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .map(resp -> {
                    if (!BizStatusCodeEnum.SUCCESS.getCode().equals(resp.getErrorcode())) {
                        return BizResponseData.error(resp.getErrorcode(), resp.getMsg());
                    }
                    BillInfoRespDTO billInfoRespDTO = new BillInfoRespDTO();

                    Optional<List<IntTranInfo>> optionalIntTranInfo = Optional.ofNullable(resp.getData())
                            .map(respJson_ -> JSON.toJavaObject(resp.getData(), BillPayResp.class))
                            .map(BillPayResp::getAcctChkOTraninfo)
                            .map(IntCheckoutTranInfo::getAcctTrans);
                    if (!optionalIntTranInfo.isPresent()) {
                        return BizResponseData.ok();
                    }
                    List<BillInfoPaysRespDTO> billInfoPaysRespDTOS = new ArrayList<>();
                    List<BillInfoBillsRespDTO> billInfoBillsRespDTOS = new ArrayList<>();
                    Long totalPay = 0L;
                    Long totalBill = 0L;
                    Long totalDeposit = 0L;
                    for (IntTranInfo intTranInfo : optionalIntTranInfo.get()) {
                        //Trantype为2表示付款
                        if ("2".equals(intTranInfo.getTrantype())) {
                            BillInfoPaysRespDTO billInfoPaysRespDTO = new BillInfoPaysRespDTO();
                            BillTypeIdReqDTO billTypeCodeReqDTO = new BillTypeIdReqDTO(billInfoReqDTO.getHotelCode(), intTranInfo.getTranCode(), 1);
                            BillTypeDTO billTypeDTO = configRemoteService.getBillIdByCode(billTypeCodeReqDTO).getData();
                            billInfoPaysRespDTO.setPayType(billTypeDTO == null ? 0 : billTypeDTO.getBillId());
                            billInfoPaysRespDTO.setAccnt("");
                            billInfoPaysRespDTO.setPayId(intTranInfo.getTranCode());
                            billInfoPaysRespDTO.setTime(DateUtil.parse(intTranInfo.getPostTime().substring(0, 19)).getTime() / 1000);
                            billInfoPaysRespDTO.setRoomNo(intTranInfo.getRoomNum());
                            billInfoPaysRespDTOS.add(billInfoPaysRespDTO);
                            totalPay += billInfoPaysRespDTO.getPayAmount();
                            long amt = DecimalUtil.toFenOfLongByBigDecimal(intTranInfo.getTranAmt());
                            billInfoPaysRespDTO.setPayAmount(amt);
                            totalDeposit += amt;

                        } else {
                            BillInfoBillsRespDTO billInfoBillsRespDTO = new BillInfoBillsRespDTO();
                            BillTypeIdReqDTO billTypeCodeReqDTO = new BillTypeIdReqDTO(billInfoReqDTO.getHotelCode(), intTranInfo.getTranCode(), 2);
                            BillTypeDTO billTypeDTO = configRemoteService.getBillIdByCode(billTypeCodeReqDTO).getData();
                            billInfoBillsRespDTO.setBillType(billTypeDTO == null ? 0 : billTypeDTO.getBillId());
                            billInfoBillsRespDTO.setPrice(DecimalUtil.toFenOfLongByBigDecimal(intTranInfo.getTranAmt()));
                            billInfoBillsRespDTO.setAccnt("");
                            billInfoBillsRespDTO.setBillId(intTranInfo.getTranCode());
                            billInfoBillsRespDTO.setTime(DateUtil.parse(intTranInfo.getPostTime().substring(0, 19)).getTime() / 1000);
                            billInfoBillsRespDTO.setCount(1);
                            billInfoBillsRespDTO.setRoomNo(intTranInfo.getRoomNum());
                            billInfoBillsRespDTOS.add(billInfoBillsRespDTO);
                            totalBill += billInfoBillsRespDTO.getPrice();
                        }
                    }
                    billInfoRespDTO.setHotelCode(billInfoReqDTO.getHotelCode());
                    billInfoRespDTO.setPmsNo(billInfoReqDTO.getPmsNo());
                    billInfoRespDTO.setBills(billInfoBillsRespDTOS);
                    billInfoRespDTO.setPays(billInfoPaysRespDTOS);
                    billInfoRespDTO.setBalance(totalBill - totalPay);
                    billInfoRespDTO.setTotalBill(totalBill);
                    billInfoRespDTO.setTotalDeposit(totalDeposit);
                    billInfoRespDTO.setTotalPay(totalPay);
                    return new BizResponseData(BizStatusCodeEnum.SUCCESS, billInfoRespDTO);
                }).collect(Collectors.toList());
        return list.get(0);
    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

JUC 九. CompletableFuture 的相关文章

随机推荐

  • tp5中树状图数据格式的返回

    1 条件 数据库中的数据必须是无限递归数据 2 数据处理 public function treeListApi list this gt db class gt field id name parent id gt where is de
  • str功能的实现

    1 strcat 功能 把src所指字符串添加到dest结尾处 覆盖dest结尾处的 0 并添加 0 程序 char strcat char dest const char src char addr dest int i 0 j 0 as
  • 私服的搭建

    私服 基于nexus 3 20 1 下载地址 https help sonatype com repomanager3 download 目前该下载地址国内下载根本下载不了 你可以去百度找找看 分享一个百度网盘下载链接 链接 https p
  • Oracle 索引

    1 索引介绍 索引是用于加速数据存取的数据对象 是对数据表中一个或多个列进行排序的结构 合理的使用索引可以大大降低I O次数 从而提高数据访问性能 2 问题 为什么需要索引 Select from scott emp where empno
  • python(五)函数、模块、包

    模块 包的概念 在Python中 一个 py文件就称之为一个模块 Module 我们在编写程序的时候 也经常引用其他模块 包括Python内置的模块和来自第三方的模块 你也许还想到 如果不同的人编写的模块名相同怎么办 为了避免模块名冲突 P
  • oracle多个表count值求和

    union和union all关键字都是将两个结果集合并为一个 但这两者从使用和效率上来说都有所不同 union在进行表链接后会筛选掉重复的记录 所以在表链接后会对所产生的结果集进行排序运算 删除重复的记录再返回结果 而union all只
  • docker安装fastDFS

    一 docker安装 1 搜索镜像 2 拉取镜像 最新版本 docker pull delron fastdfs 3 使用镜像构建容器 3 1 创建tracker容器 docker run dti network host name my
  • js-yaml简单使用

    安装 js yaml npm install js yaml index js let fs require fs let content fs readFileSync text yaml encoding utf8 let yaml r
  • img加载图片的三种方式

    方式一 src指向图像的位置 最常用的一种方式 无需搭配后端代码 img src img boat gif alt Big Boat 方式二 src执行后台路径 获取图片的字节数组 前端代码 img src getImage alt Big
  • ubuntu20.04网络配置

    安装net tools sudo apt get install net tools 2 ifconfig查看网卡设备 其中flags表中 running表示正在使用中 查看设备核心网络路由表 route n Destination 目标网
  • 将AWS S3大文件文件上传相关的API集成为js文件,功能包括 多文件并行上传、文件分片上传、断点续传、文件分片合成、上传暂停、取消上传、文件上传进度条显示

    地址 https github com gk 1213 easy s3 tree main 效果 直接运行vue example文件就行 easy s3 将AWS S3大文件文件上传相关的API集成为js文件 功能包括多文件并行上传 文件分
  • 【C-函数】scanf函数原理

    文章目录 1 行缓冲 2 scanf原理 3 scanf案例 1 代码 2 结果 1 行缓冲 行缓冲的意思就是我们输入的字符放入缓冲区 直到输入了回车键进行换行才进行 I O 操作 2 scanf原理 在C中的标准输入函数scanf 使用的
  • Xilinx FPGA PCIe XDMA性能测试报告(二)

    1 测试内容 本报告对Xilinx FPGA的PCIe XDMA结合DDR4 SDRAM缓存的性能进行了测试 同时 给出了具体的测试框图 测试平台 测试步骤 测试记录等内容 2 测试框图 3 测试平台 硬件平台 Dell R330 Inte
  • 求解汉诺塔问题(提示, 使用递归)

    汉诺塔问题是一个经典的问题 汉诺塔 Hanoi Tower 又称河内塔 源于印度一个古老传说 大梵天创造世界的时候做了三根金刚石柱子 在一根柱子上从下往上按照大小顺序摞着64片黄金圆盘 大梵天命令婆罗门把圆盘从下面开始按大小顺序重新摆放在另
  • IDEA Unescaped xml character报错的解决办法

    File Settings里边 选择Editor Inspections 并在右边找到HTML下的Malformed content of
  • 对Verilog 初学者比较有用的整理(转自它处)

    作者 Ian11122840 时间 2010 9 27 09 04 标题 菜鸟做设计必看 有关如何
  • JAVA中容器的概念

    解释一 容器 Container Spring 提供容器功能 容器可以管理对象的生命周期 对象与对象之间的依赖关系 您可以使用一个配置文件 通常是XML 在上面定义好对象的名称 如何产生 Prototype 方式或Singleton 方式
  • x86中vdso数据段的初始化及更新和使用

    1 vdso 数据段的初始化 1 数据段的定义 vdso 数据段由内核进行声明和定义 其中 在链接脚本 arch x86 entry vdso vdso layout lds S 里指定了 vdso 的数据段的名称和位置 相关内容如下 SE
  • C++函数运用学习篇

    输入一个字符串 串内有数字和非数字字符 例如 abc456 sd78fd123s 789df 将其中连续的数字作为一个整数 依次存放到另一个整型数组b中 例如上述例子 将456放入b 0 中 78放入b 1 中 统计出整数的个数并输出这些整
  • JUC 九. CompletableFuture

    目录 一 CompletableFuture 计算结果完成时回调 一个线程的执行依赖另一个线程 二 根据案例再次了解 CompletableFuture supplyAsync 四 CompletableFuture 常用api简介 获取结