java--基础--17.10--线程--CompletableFuture

2023-11-13

java–基础–17.10–线程–CompletableFuture


1、介绍

[外链在这里插入图片描述
防盗链机制,建议将图片保存下来直接上传(img-Rf7tQZjW-1693300440049)(./image1/1.png)]

1.1、实现Future接口

  1. 实现Future接口,并在此基础上进行了扩展和增强
  2. 弥补了Future的局限性

1.2、实现CompletionStage接口

  1. 实现CompletionStage接口,实现了对任务编排的能力。可以轻松地组织不同任务的运行顺序、规则以及方式。
  2. 定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。
  3. 异步执行的,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。

2、创建异步操作

2.1、四个静态方法来创建一个异步操作

//以Runnable函数式接口类型为参数,没有返回结果
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);


//以Supplier函数式接口类型为参数,返回结果类型为U
// Supplier接口的 get()是有返回值的(会阻塞)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

2.2、执行异步代码的线程池

2.2.1、没有指定线程池(指定Executor的方法)

  1. 使用 ForkJoinPool.commonPool() 线程池 执行异步代码。
  2. ForkJoinPool.commonPool() 线程池 默认创建的线程数是 CPU 的核数
    1. 可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数。

如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

2.2.2、指定线程池

  1. 使用指定的线程池 执行异步代码

2.2.3、注意

如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

3、结果处理

  1. 方法不以Async结尾,意味着Action使用相同的线程执行
  2. 方法以Async结尾,Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
//当异步任务发生异常的时触发此方法,可以用来返回一个默认值
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);

//当异步任务完成(不管是正常完成还是异常完成都会触发)之后触发,可以用来进行后续操作,无法修改返回值
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

//当异步任务完成(不管是正常完成还是异常完成都会触发)之后触发,如果是异常完成(异步任务的结果如果为 null则发生异常,否则为正常完成)可以用来返回默认值;如果是正常完成可以用来进行后续操作,并返回结果。
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);



3.1、exceptionally()

一般与whenComplete()配合使用,异常捕获范围包含前面的所有异步线程

具体使用:
  public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运行结果:" + i);
            return i;
        }, threadPool).exceptionally(excption -> {
            //可以感知异常,同时返回默认数据
            System.out.println("执行发生异常,返回默认数据,异常信息为:" + excption);
            return 10;
        });
        System.out.println("执行结果为:" + future.get());
    }


输出:

当前线程号:22
执行发生异常,返回默认数据,异常信息为:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
执行结果为:10

3.2、whenComplete()

一般与exceptionally()配合使用,获取前一个异步线程的结果和异常

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, threadPool).whenCompleteAsync((res, excption) -> {
            //虽然能得到异常信息,但是没法修改返回数据
            System.out.println("异步任务成功执行....结果是:" + res + ",异常是:" + excption);
        }, threadPool);
        System.out.println("执行结果为:" + future.get());
    }


输出:
当前线程号:22
运行结果:5
异步任务成功执行....结果是:5,异常是:null
执行结果为:5

3.3、handle()

获取前一个异步线程的结果和异常,根据是否有异常产生执行不一样的逻辑

具体使用:

    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程号:" + Thread.currentThread().getId());
            //int i = 10 / 0;
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, threadPool).handle((res, excption) -> {
            //异步方法执行完的后续处理
            if (excption != null) {
                System.out.println("执行发生异常,返回默认数据,异常信息为:" + excption);
                return 10;
            }
            System.out.println("异步任务成功执行....上一步的结果是:" + res);
            return res * 2;
        });
        System.out.println("执行结果为:" + future.get());
    }

输出:

当前线程号:22
运行结果:5
异步任务成功执行....上一步的结果是:5
执行结果为:10


当前线程号:22
执行发生异常,返回默认数据,异常信息为:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
执行结果为:10

4、线程穿行

  1. 方法不以Async结尾,意味着Action使用相同的线程执行
  2. 方法以Async结尾,可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
//不能获取上一步的执行结果,也没有自己的返回值
ConnectionFuture<Void> thenRun(Runnable var1);
ConnectionFuture<Void> thenRunAsync(Runnable var1);
ConnectionFuture<Void> thenRunAsync(Runnable var1, Executor var2);

//能获取上一步的结果,但是没有自己返回值
ConnectionFuture<Void> thenAccept(Consumer<? super T> var1);
ConnectionFuture<Void> thenAcceptAsync(Consumer<? super T> var1);
ConnectionFuture<Void> thenAcceptAsync(Consumer<? super T> var1, Executor var2);

//能获取上一步的结果,而且有自己的返回值
<U> ConnectionFuture<U> thenApply(Function<? super T, ? extends U> var1);
<U> ConnectionFuture<U> thenApplyAsync(Function<? super T, ? extends U> var1);
<U> ConnectionFuture<U> thenApplyAsync(Function<? super T, ? extends U> var1, Executor var2);

//能获取上一步的结果,而且有自己的返回值
<U> ConnectionFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> var1);
<U> ConnectionFuture<U> thenCompose(BiFunction<? super T, ? super Throwable, ? extends CompletionStage<U>> var1);
<U> ConnectionFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> var1);
<U> ConnectionFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> var1, Executor var2);

4.1、thenRun()

不能获取上一步的执行结果,也没有自己的返回值

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务结束,运行结果:" + i);
            return i;
        }, threadPool).thenRunAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
        }, threadPool);
        System.out.println("执行结果为:" + future.get());
    }
 
输出:

第一个任务,当前线程号:22
任务结束,运行结果:5
第二个任务,当前线程号:23
执行结果为:null

4.2、thenAccept()

能获取上一步的结果,但是没有自己返回值

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务结束,运行结果:" + i);
            return i;
        }, threadPool).thenAcceptAsync(res -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            System.out.println("上个任务的结果:" + res);
        }, threadPool);
        System.out.println("执行结果为:" + future.get());
    }
 
输出:
第一个任务,当前线程号:22
任务结束,运行结果:5
第二个任务,当前线程号:23
上个任务的结果:5
执行结果为:null

4.3、thenApply()

能获取上一步的结果,而且有自己的返回值,并且自己的返回值类型可以与上一个返回值的类型不一致

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务结束,运行结果:" + i);
            return i;
        }, threadPool).thenApplyAsync(res -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            System.out.println("上个任务的结果:" + res);
            return res * 2 + "个";
        }, threadPool);
        System.out.println("执行结果为:" + future.get());
    }
 
输出:
第一个任务,当前线程号:22
任务结束,运行结果:5
第二个任务,当前线程号:23
上个任务的结果:5
执行结果为:10个

4.4、thenCompose()

能获取上一步的结果,而且有自己的返回值,与thenApply()具有相同的功能

具体使用:

    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务结束,运行结果:" + i);
            return i;
        }, threadPool).thenComposeAsync(res -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            System.out.println("上个任务的结果:" + res);
            return CompletableFuture.supplyAsync(() -> {
                return res * 2 + "个";
            });
        }, threadPool);
        System.out.println("执行结果为:" + future.get());

    }
}



 
输出:

第一个任务,当前线程号:22
任务结束,运行结果:5
第二个任务,当前线程号:23
上个任务的结果:5
执行结果为:10个


与thenApply()的区别
  • thenApply()
    • 返回的不是CompletableFuture类型
    • 它的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>
  • thenCompose()
    • 用来连接两个CompletableFuture
    • 返回值是新的CompletableFuture

5、两个任务组合–都要完成

  1. 有点类似于AND。
  2. 方法不以Async结尾,意味着Action使用相同的线程执行
  3. 方法以Async结尾,Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

//当两个任务都正常完成时,执行给定的操作
ConnectionFuture<Void> runAfterBoth(CompletionStage<?> var1, Runnable var2);
ConnectionFuture<Void> runAfterBothAsync(CompletionStage<?> var1, Runnable var2);
ConnectionFuture<Void> runAfterBothAsync(CompletionStage<?> var1, Runnable var2, Executor var3);

//当两个任务都正常完成时,使用两个结果作为参数,执行给定的操作,没有返回值
<U> ConnectionFuture<Void> thenAcceptBoth(CompletionStage<? extends U> var1, BiConsumer<? super T, ? super U> var2);
<U> ConnectionFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> var1, BiConsumer<? super T, ? super U> var2);
<U> ConnectionFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> var1, BiConsumer<? super T, ? super U> var2, Executor var3);

//当两个任务都正常完成时,使用两个结果作为参数,执行给定的操作,有返回值
<U, V> ConnectionFuture<V> thenCombine(CompletionStage<? extends U> var1, BiFunction<? super T, ? super U, ? extends V> var2);
<U, V> ConnectionFuture<V> thenCombineAsync(CompletionStage<? extends U> var1, BiFunction<? super T, ? super U, ? extends V> var2);
<U, V> ConnectionFuture<V> thenCombineAsync(CompletionStage<? extends U> var1, BiFunction<? super T, ? super U, ? extends V> var2, Executor var3);

5.1、runAfterBoth()

两个任务都执行完成后,执行下一步操作(Runnable类型任务),没有使用前面两个任务的结果,也没有返回值

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("第一个任务,任务结束,运行结果:" + i);
            return i;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 5;
            System.out.println("第二个任务,任务结束,运行结果:" + i);
            return i;
        }, threadPool);
        final CompletableFuture<Void> future = future1.runAfterBoth(future2, () -> {
            System.out.println("组合任务,当前线程号:" + Thread.currentThread().getId());
            System.out.println("执行指定操作,没有参数,没有返回值");
        });
        System.out.println("执行结果为:" + future.get());

    }
 
输出:

第一个任务,当前线程号:22
第二个任务,当前线程号:23
第二个任务,任务结束,运行结果:2
第一个任务,任务结束,运行结果:5
组合任务,当前线程号:22
执行指定操作,没有参数,没有返回值
执行结果为:null

5.2、thenAcceptBoth()

两个任务执行完成后,将结果交给thenAcceptBoth处理,可以使用前面两个任务的结果,但无自己的返回值

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("第一个任务,任务结束,运行结果:" + i);
            return i;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 5;
            System.out.println("第二个任务,任务结束,运行结果:" + i);
            return i;
        }, threadPool);
        final CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (res1, res2) -> {
            System.out.println("组合任务,当前线程号:" + Thread.currentThread().getId());
            System.out.println("执行指定操作,前面任务的结果为:" + res1 + "," + res2 + "。没有自己的返回值");
        });
        System.out.println("执行结果为:" + future.get());

    }


输出:
第一个任务,当前线程号:22
第二个任务,当前线程号:23
第一个任务,任务结束,运行结果:5
第二个任务,任务结束,运行结果:2
组合任务,当前线程号:23
执行指定操作,前面任务的结果为:5,2。没有自己的返回值
执行结果为:null

5.3、thenCombine()

两个任务执行完成后,将结果交给thenCombine处理,可以使用前面两个任务的结果,也有自己的返回值

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("第一个任务,任务结束,运行结果:" + i);
            return i;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int i = 10 / 5;
            System.out.println("第二个任务,任务结束,运行结果:" + i);
            return i;
        }, threadPool);
        final CompletableFuture<String> future = future1.thenCombineAsync(future2, (res1, res2) -> {
            System.out.println("组合任务,当前线程号:" + Thread.currentThread().getId());
            System.out.println("执行指定操作,前面任务的结果为:" + res1 + "," + res2 + "。有自己的返回值");
            return res1 + res2 + "个";
        });
        System.out.println("执行结果为:" + future.get());

    }

输出:
第一个任务,当前线程号:22
第二个任务,当前线程号:23
第二个任务,任务结束,运行结果:2
第一个任务,任务结束,运行结果:5
组合任务,当前线程号:24
执行指定操作,前面任务的结果为:5,2。有自己的返回值
执行结果为:7个

6、两个任务组合–一个完成即可

  1. 有点类似于OR。
  2. 方法不以Async结尾,意味着Action使用相同的线程执行,
  3. 方法以Async结尾,Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
//两个异步任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
ConnectionFuture<Void> runAfterEither(CompletionStage<?> var1, Runnable var2);
ConnectionFuture<Void> runAfterEitherAsync(CompletionStage<?> var1, Runnable var2);
ConnectionFuture<Void> runAfterEitherAsync(CompletionStage<?> var1, Runnable var2, Executor var3);

//两个异步任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。
ConnectionFuture<Void> acceptEither(CompletionStage<? extends T> var1, Consumer<? super T> var2);
ConnectionFuture<Void> acceptEitherAsync(CompletionStage<? extends T> var1, Consumer<? super T> var2);
ConnectionFuture<Void> acceptEitherAsync(CompletionStage<? extends T> var1, Consumer<? super T> var2, Executor var3);

//两个异步任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。
<U> ConnectionFuture<U> applyToEither(CompletionStage<? extends T> var1, Function<? super T, U> var2);
<U> ConnectionFuture<U> applyToEitherAsync(CompletionStage<? extends T> var1, Function<? super T, U> var2);
<U> ConnectionFuture<U> applyToEitherAsync(CompletionStage<? extends T> var1, Function<? super T, U> var2, Executor var3);

6.1、runAfterEither()

两个异步任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。

具体使用:
   public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Void> future = future1.runAfterEither(future2, () -> {
            System.out.println("组合任务,当前线程号:" + Thread.currentThread().getId());
            System.out.println("执行指定操作,没有参数,没有自己的返回值");
        });
        System.out.println("执行结果为:" + future.get());

    }


输出:

第一个任务,当前线程号:22
第二个任务,当前线程号:23
第二个任务,任务结束,运行结果:2
组合任务,当前线程号:23
执行指定操作,没有参数,没有自己的返回值
执行结果为:null
第一个任务,任务结束,运行结果:5

6.2、acceptEither()

两个异步任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作(即没有自己的返回值)。

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Void> future = future1.acceptEither(future2, res -> {
            System.out.println("组合任务,当前线程号:" + Thread.currentThread().getId());
            System.out.println("执行指定操作,前面执行快的任务结果为:" + res + ",没有自己的返回值");
        });
        System.out.println("执行结果为:" + future.get());

    }


输出:

第二个任务,当前线程号:23
第一个任务,当前线程号:22
第一个任务,任务结束,运行结果:1
组合任务,当前线程号:22
执行指定操作,前面执行快的任务结果为:1,没有自己的返回值
执行结果为:null
第二个任务,任务结束,运行结果:5

6.3、applyToEither()

两个异步任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作(即有自己的返回值)。

具体使用:

    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);


        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<String> future = future1.applyToEither(future2, res -> {
            System.out.println("组合任务,当前线程号:" + Thread.currentThread().getId());
            System.out.println("执行指定操作,前面执行快的任务结果为:" + res + ",有自己的返回值");
            return res + "秒";
        });
        System.out.println("执行结果为:" + future.get());

    }



输出:
第二个任务,当前线程号:23
第一个任务,当前线程号:22
第二个任务,任务结束,运行结果:8
组合任务,当前线程号:23
执行指定操作,前面执行快的任务结果为:8,有自己的返回值
执行结果为:8秒
第一个任务,任务结束,运行结果:9

7、多任务组合

以下方法都为静态方法

7.1、anyOf()

在给定多个异步任务第一个完成时,就马上返回一个新的 CompletableFuture。结果与其第一个完成的异步任务相同。即第一个异常完成则最终结果为异常完成,第一个正常完成则最终结果为正常完成。

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第三个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第三个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Object> future = CompletableFuture.anyOf(future1, future2, future3);
        System.out.println("执行结果为:" + future.get());

    }


输出:
第三个任务,当前线程号:24
第二个任务,当前线程号:23
第一个任务,当前线程号:22
第一个任务,任务结束,运行结果:1
执行结果为:1
第三个任务,任务结束,运行结果:5
第二个任务,任务结束,运行结果:5

7.2、allOf()

7.2.1、方式1

当给定的多个异步任务都正常完成后,返回一个新的 CompletableFuture,给定 CompletableFuture 的结果不会反映在返回的 CompletableFuture 中,但可以通过单独检查给定任务来获得结果。

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int number = 5;
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第三个任务,当前线程号:" + Thread.currentThread().getId());
            int number = 2;
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第三个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2, future3);
        System.out.println("执行结果为:" + future.get());
        System.out.println("future1:" + future1.get());
        System.out.println("future2:" + future2.get());
        System.out.println("future3:" + future3.get());

    }


输出:
第三个任务,当前线程号:24
第二个任务,当前线程号:23
第一个任务,当前线程号:22
第三个任务,任务结束,运行结果:2
第一个任务,任务结束,运行结果:4
第二个任务,任务结束,运行结果:5
执行结果为:null
future1:4
future2:5
future3:2


7.2.2、方式2

当任何一个异步任务异常完成,则返回的CompletableFuture 也会异常完成,并且将该异步任务的异常作为其原因。

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int number = 5;
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第三个任务,当前线程号:" + Thread.currentThread().getId());
            int number = 2;
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第三个任务,任务结束,运行结果:" + number);
            throw new ArithmeticException();
        }, threadPool);
        final CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2, future3);
        System.out.println("执行结果为:" + future.get());

    }

输出:
第二个任务,当前线程号:23
第三个任务,当前线程号:24
第一个任务,当前线程号:22
第三个任务,任务结束,运行结果:2
第一个任务,任务结束,运行结果:4
第二个任务,任务结束,运行结果:5
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at fei.zhou.lambdalearn.completableFuture.Demo16.main(Demo16.java:57)
Caused by: java.lang.ArithmeticException
	at fei.zhou.lambdalearn.completableFuture.Demo16.lambda$main$2(Demo16.java:54)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)


7.2.3、方式3

当存在多个异常完成时,则返回排在前面的异步任务的异常信息。

具体使用:
    public static void main(String[] args) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务,当前线程号:" + Thread.currentThread().getId());
            int number = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一个任务,任务结束,运行结果:" + number);
            return number;
        }, threadPool);
        final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务,当前线程号:" + Thread.currentThread().getId());
            int number = 5;
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二个任务,任务结束,运行结果:" + number);
            throw new NullPointerException();
        }, threadPool);
        final CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("第三个任务,当前线程号:" + Thread.currentThread().getId());
            int number = 2;
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第三个任务,任务结束,运行结果:" + number);
            throw new ArithmeticException();
        }, threadPool);
        final CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2, future3);
        System.out.println("执行结果为:" + future.get());

    }

输出:
第二个任务,当前线程号:23
第三个任务,当前线程号:24
第一个任务,当前线程号:22
第三个任务,任务结束,运行结果:2
第一个任务,任务结束,运行结果:3
第二个任务,任务结束,运行结果:5
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.NullPointerException
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at fei.zhou.lambdalearn.completableFuture.Demo17.main(Demo17.java:57)
Caused by: java.lang.NullPointerException
	at fei.zhou.lambdalearn.completableFuture.Demo17.lambda$main$1(Demo17.java:43)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java--基础--17.10--线程--CompletableFuture 的相关文章

  • 如果您不在 Java 中进行克隆,那么您会做什么以及如何称呼它?

    有没有人对 Java 中的复制构造函数 工厂方法等有任何建议或已建立的最佳实践和命名约定 特别是 假设我有一堂课Thing我想要一个返回新值的方法Thing与 a 具有相同的值Thing传入 如果是实例方法 则作为实例 您会将其作为构造函数
  • 简单 XML 框架:ElementMap 中的对象具有“类似内联”的行为

    我正在尝试在 Android 上序列化自定义对象的 Hashmap 以获得如下 xml
  • 了解 netty 通道缓冲区和水印

    我正在尝试了解网络缓冲区和水印 作为一个测试用例 我有一个 netty 服务器 它向客户端写入数据 客户端被阻止 基本上每次读取之间有 10 秒的睡眠时间 在正常 I O 下 如果接收方被阻塞 TCP 发送方将受到限制 由于流量控制 发送速
  • 如何在 Android 中的 Chrome 或 Firefox 等特定浏览器的 Web 视图中加载应用程序

    我是 Android 新手 我正在做一个应用程序 我需要在平板电脑上的 Web 视图中加载现有的应用程序 在平板电脑中 当我使用 Web 视图加载应用程序时 我的应用程序将加载到默认浏览器中 如何在平板电脑上的 Web 视图中的特定浏览器
  • 垂直 ViewPager 中的动画

    我需要垂直制作这个动画ViewPager https www youtube com watch v wuE 4jjnp3g https www youtube com watch v wuE 4jjnp3g 这是我到目前为止所尝试的 vi
  • Apache Thrift Java-Javascript 通信

    我正在编写一个基于 Apache Thrift 的 Java 服务器 它将从 Javascript 客户端接收数据 我已经完成了 Java 服务器 但问题是我可以获得 Javascript 客户端的工作示例 我无法找到一个好的示例 构建文档
  • 获取Android库中的上下文

    我正在编写一个 Android 应用程序 它的一些功能封装在内部库中 但是 要使此功能发挥作用 库需要一个应用程序上下文的实例 为图书馆提供这种上下文的最佳方式是什么 我看到了一些选择 但没有一个有吸引力 Have my library c
  • Java Junit 测试 HTTP POST 请求

    我需要测试以下方法而不改变方法本身 该方法向服务器发出 POST 方法 但我需要制作一个独立于服务器的测试用例 在将其重定向到本地文件之前 我测试了类似的方法 但为此我将协议指定为文件 主机名指定为 localhost 端口指定为 1 我的
  • 绘制平滑曲线

    我想创建更平滑的曲线 而不仅仅是线角 这是我现在画的图 这是我的代码 case FREEHAND float pts float ptk ptk new float 2 imageMatrix invert inv if mCurrentS
  • for循环中更新JLabel的问题

    我的程序的想法是从之前在其他 JFrame 中保存的列表中选择一个名称 我想在标签中一个接一个地打印所有名称 它们之间有很小的延迟 然后停在其中一个名称上 问题是lbl setText String 如果有多个则不起作用setText co
  • Java 中如何验证字符串的格式是否正确

    我目前正在用 Java 编写一个验证方法来检查字符串是否是要更改为日期的几种不同格式之一 我希望它接受的格式如下 MM DD YY M DD YY MM D YY 和 M D YY 我正在测试第一种格式 每次它都告诉我它无效 即使我输入了有
  • 如何在Netbeans中设置JList的ListModel?

    我在 Netbeans IDE 的帮助下设计了一个 Swing GUI 该 GUI 包含一个 JList 默认情况下 它使用 QAbstractListModel 将其作为 JList 构造函数中的参数传递以创建该 JList 我想在 Ne
  • 如何让“循环”泛型在 Java 中工作?

    我在编译以下涉及一些泛型的代码时遇到错误 public abstract class State
  • JERSEY:错误跟踪:java.lang.IllegalStateException:实体输入流已关闭

    我正在使用 Jersey 2 x 以下是我的控制器 GET Path id Produces application json public Response getUser PathParam id int userId Context
  • ActiveMQ JNDI 查找问题

    尝试使用 JNDI 运行以下 ActiveMQ http activemq apache org jndi support html http ActiveMQ 20JNDI 并且我的 jboss server node lib 文件夹中有
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • Janusgraph 0.3.2 + HBase 1.4.9 - 无法设置 graph.timestamps

    我在 Docker 容器中运行 Janusgraph 0 3 2 并尝试使用运行 HBase 1 4 9 的 AWS EMR 集群作为存储后端 我可以运行 gremlin server sh 但如果我尝试保存某些内容 我会得到粘贴在下面的堆
  • Hibernate 标准接受 %% 值

    我正在使用下面的 Hibernate 代码来过滤workFlowName crt add Restrictions like workFlowName workFlow MatchMode ANYWHERE crt is the crite
  • 无法连接到docker中的elasticsearch容器

    我正在尝试使用 docker 的官方 elasticsearch 镜像 我遵循了本指南 https www elastic co guide en elasticsearch reference current docker html但是当
  • 从 InputStream 中删除换行符

    我喜欢从一个文件中删除所有换行符 对于 n 和 r n java io InputStream 在读取文件时 相应的方法如下所示 param target linkplain File return linkplain InputStrea

随机推荐

  • 在vue使用jsx来解决template中复杂的逻辑处理

    1 首先安装依赖 npm install postcss loader autoprefixer babel loader babel core 2 在 babelrc文件中修改 把 presets env stage 2 plugins
  • 【Python】Windows如何在cmd中切换python版本

    相信很多小伙伴都会有像我一样经历 在windows中装了很多python版本 那么如果我们正式使用的时候应该如何切换呢 方法一 从环境变量中切换python 第一步 打开环境变量 第二步 打开系统变量中Path变量 第三步 将你想使用的Py
  • spring 多个切面的执行顺序及原理

    最近和同事聊起来了springAOP的话题 说了到多个切面的时候程序是怎么执行的 我们常用的spring事务本身也是一个切面 使用的AOP原理 本人从网上找了一些资料 然后根据这些资料进行一下总结 资料地址 1 https blog csd
  • CodeLlama本地部署的实战方案

    大家好 我是herosunly 985院校硕士毕业 现担任算法研究员一职 热衷于机器学习算法研究与应用 曾获得阿里云天池比赛第一名 CCF比赛第二名 科大讯飞比赛第三名 拥有多项发明专利 对机器学习和深度学习拥有自己独到的见解 曾经辅导过若
  • C++:没有与参数列表匹配的构造函数

    报错 E0289 没有与参数列表匹配的构造函数 sales data sales data 实例 初始化一个实例对象 类内定义的构造函数 报错原因 构造函数中第二个参数的类型为 unsigned 而引用只能是引用一个对象 实例化对象时 括号
  • 神经网络(十四)Pytorch完整模型训练和调用GPU加速

    一 模型的训练 Step1 准备数据集 import torchvision train data torchvision dataset CIFAR10 data train True transform torchvision ToTe
  • 微信投屏服务器出错,微信发布7.0.21版本,修复异常问题,增加超实用新功能

    原标题 微信发布7 0 21版本 修复异常问题 增加超实用新功能 iOS微信又更新了 今天发布v7 0 21版本 距离上次更新才两个礼拜的时间 以往微信的更新频率是比较慢的 这次之所以如此之快地发布新版本 感觉是与问题修复有关 因为之前有一
  • UNIX环境高级编程 学习笔记 第六章 系统数据文件和信息

    UNIX系统口令文件 POSIX 1称其为用户数据库 包含以下字段 这些字段也包含在头文件pwd h中定义的passwd结构中 由于历史原因 口令文件 etc passwd是一个ASCII文件 其中的每行都包含以上各字段 字段之间用冒号分隔
  • 无法使用@RequestBody或无法直接使用对象类型获取前端的传递的对象数据

    一 问题概述 当前端发送请求时 传递的参数是一个对象类型 例如 searchForm name 1 age 18 这种格式时 会习惯性使用 RequestBody在后端进行接收 但会发现无法接收到数据 如果你使用的请求方式是get 用的还是
  • 学好ES6/ES2015-核心部分(上)

    ECMAScript 6 以下简称ES6 是JavaScript语言的下一代标准 因为当前版本的ES6是在2015年发布的 所以又称ECMAScript 2015 也就是说 ES6就是ES2015 虽然目前并不是所有浏览器都能兼容ES6全部
  • 贝叶斯优化python包_贝叶斯优化

    万壑松风知客来 摇扇抚琴待留声 1 文起 本篇文章记录通过 Python 调用第三方库 从而调用使用了贝叶斯优化原理的 Hyperopt 方法来进行超参数的优化选择 具体贝叶斯优化原理与相关介绍将在下一次文章中做较为详细的描述 可以参考这里
  • React:判断是否为true有可能会出现的问题

    今天遇到个小问题改了好久 因为state值是要改成url地址 以为是自己哪里逻辑出问题了 搞了好久才发现 是自己判断出现了错误 记录一下 写个小例子 防止二次发生 菜鸡一枚 还希望得到大佬的详解 以下是数字和字符串隐试转换规则 任何非零的数
  • sqli-labs:less-11/12 简单SQL注入和身份验证漏洞综合

    这两个靶场是一样的题 我就拿less 12说事了吧 首先 尝试胡乱输入密码进行测试 发现存在报错 这时用admin和admin这个正确的账号密码进行测试 1 10前面的题目告诉了 发现有着正确的提示 但是还不够 我们尝试在username后
  • vue中axios的二次封装

    1 如果对axios不了解的可以先移步中文axios网 axios中文文档 axios中文网 axioshttp www axios js com zh cn docs 2 这是稀土掘金上的关于axios的封装个人感觉比较细节易懂 vue中
  • UE4 Slate 柱状图

    Fill out your copyright notice in the Description page of Project Settings include ZZTWidget h include Windows AllowWind
  • [Binospace] 深入分析HBase RPC(Protobuf)实现机制

    背景 在HMaster RegionServer内部 创建了RpcServer实例 并与Client三者之间实现了Rpc调用 HBase0 95内部引入了Google Protobuf作为中间数据组织方式 并在Protobuf提供的Rpc接
  • 类默认成员函数之拷贝构造函数

    学习完类中的默认成员函数 构造函数和析构函数 下一个学习的默认成员函数是拷贝构造函数 那么拷贝构造函数与前两个函数又有哪些区别呢 1 拷贝构造函数首先我们先知道它与构造函数构成重载 故函数名相同 我们先写一个最简单拷贝函数 class Da
  • C# 接口Get请求带Body

    这几天在调用第三方的接口遇到了GET请求并且还要带有body参数 并且参数类型还得是Json 在我的记忆力GET都是URL传参 网上找了好多方法都不行 用Postman调用可以请求到数据 但是C 代码怎么弄都不行 用Postman生成得代码
  • eclipse中提示的时候总是出现未响应的解决方案

    将eclipse自带的jre换成自己安装的jdk的jre Eclipse Window Preferences Java Installed JREs 选择jdk的安装路径 然后确定 之后勾选jdk就行了
  • java--基础--17.10--线程--CompletableFuture

    java 基础 17 10 线程 CompletableFuture 1 介绍 外链 防盗链机制 建议将图片保存下来直接上传 img Rf7tQZjW 1693300440049 image1 1 png 1 1 实现Future接口 实现