第二章:CompletableFuture

2023-11-02

Future接口理论知识复习

Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。

一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

Future接口常用实现类FutureTask异步任务 

Future接口能干什么?

Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过Future把这个任务放到异步线程中执行。 主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。

代码说话:

  1. Runnable接口
  2. Callable接口
  3. Future接口和FutureTask实现类

目的:异步多线程任务执行且返回有结果,三个特点:多线程/有返回/异步任务。

本源的Future接口相关的架构

package com.lzx.juc.cf;

import java.util.concurrent.*;

/**
 * @author admin
 */
public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());

        Thread t1 = new Thread(futureTask, "t1");
        t1.start();

        System.out.println(futureTask.get());
    }
    
}

class MyThread implements Callable<String> {

    @Override
    public String call() {
        System.out.println("-----come in call() ");
        return "hello Callable";
    }

}

Future编码实战和有缺点分析

优点

Future+线程池异步多线程任务配合,能显著提高程序的执行效率。

上述案例case

package com.lzx.juc.cf;

import java.util.concurrent.*;

/**
 * @author admin
 */
public class FutureThreadPoolDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //3个任务,目前开启多个异步任务线程来处理,请问耗时多少?
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        long startTime = System.currentTimeMillis();

        FutureTask<String> futureTask1 = new FutureTask<>(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task1 over";
        });

        threadPool.submit(futureTask1);

        FutureTask<String> futureTask2 = new FutureTask<>(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task2 over";
        });

        threadPool.submit(futureTask2);

        System.out.println(futureTask1.get());
        System.out.println(futureTask2.get());

        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");


        System.out.println(Thread.currentThread().getName() + "\t -----end");
        threadPool.shutdown();
    }

    private static void m1() {
        //3个任务,目前只有一个线程main来处理,请问耗时多少?
        long startTime = System.currentTimeMillis();

        //暂停毫秒
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");

        System.out.println(Thread.currentThread().getName() + "\t -----end");
    }
}

缺点

Code1

package com.bilibili.juc.cf;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 1 get容易导致阻塞,一般建议放在程序后面,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,容易程序堵塞。
 * 2 假如我不愿意等待很长时间,我希望过时不候,可以自动离开.
 *
 * @author admin
 */
public class FutureAPIDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println(Thread.currentThread().getName() + "\t -----come in");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });

        Thread t1 = new Thread(futureTask, "t1");
        t1.start();

        System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");

        //System.out.println(futureTask.get());
        System.out.println(futureTask.get(3,TimeUnit.SECONDS));
    }
}

get()阻塞

一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞。

Code2

package com.lzx.juc.cf;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 1 get容易导致阻塞,一般建议放在程序后面,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,容易程序堵塞。
 * 2 假如我不愿意等待很长时间,我希望过时不候,可以自动离开.
 *
 * @author admin
 */
public class FutureAPIDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println(Thread.currentThread().getName() + "\t -----come in");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });

        Thread t1 = new Thread(futureTask, "t1");
        t1.start();

        System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");

        //System.out.println(futureTask.get());
        //System.out.println(futureTask.get(3,TimeUnit.SECONDS));

        while (true) {
            if (futureTask.isDone()) {
                System.out.println(futureTask.get());
                break;
            } else {
                //暂停毫秒
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("正在处理中,不要再催了,越催越慢 ,再催熄火");
            }
        }
    }
}

idDone()轮询

轮询的方式会消耗无谓的CPU资源,而且也不见得能及时的得到计算结果;如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。

结论

Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

想完成一些复杂的任务

对于简单的业务场景使用Future是完全的OK的

Future就显得有些力不从心了;

回调通知

应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知;通过轮询的方式去判断任务是否完成这样非常占CPU,并且代码也不优雅。

创建异步任务

Future+线程池配合

多个任务前后依赖可以组合处理

1、想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值。将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个有依赖前一个处理的结果。

2、对计算速度选最快;当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果。

。。。。。。

再这样的场景下,再使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求。

从i到i++

Future能干的,CompletableFutrue都能干。

CompletableFuture对Future的改进

CompletableFuture为什么会出现

get()方法在Future计算完成之前会一直在阻塞状态下,isDone()方法容易消耗CPU资源,对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无谓的CPU资源。因此,JDK8设计出CompletableFuture。

CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

CompletableFuture和CompletionStage源码分别介绍

类架构说明

接口CompletionStage 

是什么?

代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。

类CompletableFuture 

是什么?

核心的四个静态方法,来创建一个异步任务 

runAsync无返回值

public static CompletableFutrue<Void> runAsync(Runnable runnable);

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

supplyAsync有返回值

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor );

上述Executor executor参数说明

没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码。

无返回值

package com.lzx.juc.cf;

import java.util.concurrent.*;

/**
 * @author admin
 */
public class CompletableFutureBuildDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());

            //暂停几秒钟线程
            try { 
                TimeUnit.SECONDS.sleep(1); 
            } catch (InterruptedException e) {
                e.printStackTrace(); 
            }
        }, threadPool);

        System.out.println(completableFuture.get());

        threadPool.shutdown();
    }

}

有返回值

package com.lzx.juc.cf;

import java.util.concurrent.*;

/**
 * @author admin
 */
public class CompletableFutureBuildDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());

            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "hello supplyAsync";
        }, threadPool);

        System.out.println(completableFuture.get());

        threadPool.shutdown();
    }
}

Code之通用演示减少阻塞和轮询

从Java8开始引入了CompletableFuture,它是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

package com.lzx.juc.cf;

import java.util.concurrent.*;

/**
 * @author admin
 */
public class CompletableFutureUseDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        try {
            CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "----come in");
                int result = ThreadLocalRandom.current().nextInt(10);

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("-----1秒钟后出结果:" + result);

                if (result > 2) {
                    int i = 10 / 0;
                }

                return result;
            }, threadPool).whenComplete((v, e) -> {
                if (e == null) {
                    System.out.println("-----计算完成,更新系统UpdateValue:" + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
                return null;
            });

            System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }


        //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        //try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }

    }

    private static void future1() throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "----come in");
            int result = ThreadLocalRandom.current().nextInt(10);

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("-----1秒钟后出结果:" + result);
            return result;
        });

        System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");

        System.out.println(completableFuture.get());
    }

}

解释下为什么默认的线程池关闭,自定义的线程池记得关闭。

CompletableFuture的优点

异步任务结束时,会自动回调某个对象方法;

主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行;

异步任务出错时,会自动回调某个对象的方法;

案例精讲-从电商网站的比价需求说开去

先看看大厂面试题

函数式编程已经主流

Lambda表达式+Stream流失调用+Chain链式调用+Java8函数式编程。

Runnable

Runnable已经说过很多次了,无参数,无返回值。

Function

Function<T, R>接受一个参数,并且有返回值。

Consumer

Consumer消费型函数接口,接受一个参数,没有返回值。

BiConsumer

BiConsumer<T, U>消费型函数接口,接受两个参数(Bi,英文单词词根,代表两个的意思),没有返回值。

Supplier

Supplier供给型函数接口,没有参数,有一个返回值。

小总结

先说说join和get对比 

join与get在功能上几乎没有什么区别,区别在云get在编译期会有抛出检查异常,而join不会。

说说你过去工作中的项目亮点?大厂业务需求说明?

切记,功能->性能,先满足功能的完成,再到性能的完善。

电商网站比价需求分析。

1、    需求说明

1.1、    同一款产品,同时搜索出同款产品在各大电商平台的售价;

1.2、    同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少?

2、输出返回:

出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>

《mysql》in jd price is 88.5

《mysql》 in dangdang price is 86.11

《mysql》 in Taobao price is 90.43

3、解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表

1、step by step,按部就班,查完京东查淘宝,查完淘宝查天猫……

2、all in,万箭齐发,一口气对线程异步任务同时查询。

一波流Java8函数式编程带走-比价案例实战Case

package com.lzx.juc.cf;

import lombok.*;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author admin
 * <p>
 * 案例说明:电商比价需求,模拟如下情况:
 * <p>
 * 1需求:
 * 1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
 * 1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
 * <p>
 * 2输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
 * 《mysql》 in jd price is 88.05
 * 《mysql》 in dangdang price is 86.11
 * 《mysql》 in taobao price is 90.43
 * <p>
 * 3 技术要求
 * 3.1 函数式编程
 * 3.2 链式编程
 * 3.3 Stream流式计算
 */
public class CompletableFutureMallDemo {

    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"),
            new NetMall("pdd"),
            new NetMall("tmall")
    );

    /**
     * step by step 一家家搜查
     * List<NetMall> ----->map------> List<String>
     *
     */
    public static List<String> getPrice(List<NetMall> list, String productName) {
        //《mysql》 in taobao price is 90.43
        return list
                .stream()
                .map(netMall ->
                        String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }

    /**
     * List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
     *
     */
    public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {
        return list.stream().map(netMall ->
                        CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        
        for (String element : list1) {
            System.out.println(element);
        }
        
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");

        System.out.println("--------------------");

        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(list, "mysql");
        
        for (String element : list2) {
            System.out.println(element);
        }
        
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime2 - startTime2) + " 毫秒");
    }
}

class NetMall {

    @Getter
    private String netMallName;

    public NetMall(String netMallName) {
        this.netMallName = netMallName;
    }

    public double calcPrice(String productName) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}
CompletableFuture常用方法

1、获得结果和触发计算

获取结果

public T get();---不见不散

public T get(long timeout, TimeUnit unit);---过时不候

public T join();

public T getNow(T valueIfAbsent);----没有计算完成的情况下,给一个替代的结果;立即获取结果不阻塞(计算完,返回计算完成后的结果;没计算完,返回设定的valueIfAbsent值)。

public boolean complete(T value);----主动触发计算;是否打断get方法立即反回括号值。

package com.lzx.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @author admin
 */
public class CompletableFutureAPIDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        group1();
    }

    /**
     * 获得结果和触发计算
     *
     */
    private static void group1() throws InterruptedException, ExecutionException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "abc";
        });

        //System.out.println(completableFuture.get());
        //System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
        //System.out.println(completableFuture.join());

        //暂停几秒钟线程
        //try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

        //System.out.println(completableFuture.getNow("xxx"));
        System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.get());
    }

}

2、对计算结果进行处理 

thenApply()----计算结果存在依赖关系,这两个线程串行化。

package com.lzx.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author admin
 */
public class CompletableFutureAPI2Demo {

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1;
        }, threadPool).thenApply(f -> {
            int i=10/0;
            System.out.println("222");
            return f + 2;
        }).thenApply(f -> {
            System.out.println("333");
            return f + 3;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("----计算结果: " + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });

        System.out.println(Thread.currentThread().getName() + "----主线程先去忙其它任务");

        threadPool.shutdown();
    }

}

异常相关;由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。

handle()----计算结果存在依赖关系,这两个线程串行化。

package com.bilibili.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author admin
 */
public class CompletableFutureAPI2Demo {
    
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1;
        }, threadPool).handle((f, e) -> {
            int i = 10 / 0;
            System.out.println("222");
            return f + 2;
        }).handle((f, e) -> {
            System.out.println("333");
            return f + 3;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("----计算结果: " + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });

        System.out.println(Thread.currentThread().getName() + "----主线程先去忙其它任务");

        threadPool.shutdown();
    }
}

异常相关;有异常也可以往下一步走,根据带的异常参数可以进一步处理。

总结

3、对计算结果进行消费 

接收任务的处理结果,并消费处理,无返回结果。

thenAccept();

package com.lzx.juc.cf;

import java.util.concurrent.CompletableFuture;

/**
 * @author admin
 */
public class CompletableFutureAPI3Demo {

    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> 1)
                .thenApply(f -> f + 2)
                .thenApply(f -> f + 3)
                .thenAccept(System.out::println);
    }

}

对比补充

Code之任务之间的顺序执行

thenRun()

thenRun(Runnable runnable);---任务A执行完成执行B,并且B不需要A的结果。

thenAccept()

thenAccpet(Consumer action);---任务A执行完执行B,B需要A的结果,但是任务B无返回值

thenApply()

thenApply(Function fn);---任务A执行完执行B,B需要A的结果,同时任务B有返回值

code

package com.lzx.juc.cf;

import java.util.concurrent.CompletableFuture;

/**
 * @author admin
 */
public class CompletableFutureAPI3Demo {

    public static void main(String[] args) {
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(System.out::println).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
    }

}

CompletableFuture和线程池说明

以thenRun和thenRunAsync为例,有什么区别?

package com.lzx.juc.cf;

import java.util.concurrent.*;

/**
 * @author admin
 */
public class CompletableFutureWithThreadPoolDemo {
    
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        try {
            CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
                return "abcd";
            }, threadPool).thenRunAsync(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
            }).thenRun(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
            }).thenRun(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
            });
            System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
    
}

小总结

1、没有传入自定义线程池,都用默认线程池ForkJoinPool。

2、传入了一个自定义线程池,如果你执行第一个任务的时候,传入了一个自定义线程池:

调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。

调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoinPoo线程池。

3、备注

有可能处理太快,系统优化切换原则,直接使用main线程处理。

其他如:thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理。

调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoinPoo线程池。

源码分析

4、对计算速度进行选用 

谁快用谁

applyToEither()

package com.lzx.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author admin
 */
public class CompletableFutureFastDemo {
    
    public static void main(String[] args) {
        CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("A come in");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playA";
        });

        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("B come in");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playB";
        });

        CompletableFuture<String> result = playA.applyToEither(playB, f -> f + " is winer");

        System.out.println(Thread.currentThread().getName() + "\t" + "-----: " + result.join());
    }
}

5、对计算结果进行合并

两个CompletableStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理;先完成的先等着,等待其他分支任务。

thenCombine()

Code标准版,好理解先拆分

package com.lzx.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author admin
 */
public class CompletableFutureCombineDemo {
    
    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 20;
        });

        CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println("-----开始两个结果合并");
            return x + y;
        });

        System.out.println(result.join());

    }

}

Code表达式

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

第二章:CompletableFuture 的相关文章

  • Maven 2:如何将当前项目版本打包在WAR文件中?

    我正在使用 Maven 2 构建我的 Java 项目 并且正在寻找一种向用户呈现 pom xml 当前版本号的方法 例如使用 Servlet 或 JSP 据我所知 最好的方法是 Maven 将版本号作为文本文件打包到 WAR 中 这使我能够
  • 具有默认值的 Java JAX-RS 自定义参数

    假设我有这个 这只是一个示例 GET Path value address Produces application json public Response getAddress QueryParam user User user 用户是
  • 我在socket上设置了超时,发现这个值不能大于21

    我在socket上设置了超时 该值小于21秒才有效 21秒后发现超时还是21秒 public static void main String args SimpleDateFormat sdf new SimpleDateFormat yy
  • Java - 如何将特殊字符放入字符串中

    Java 似乎有很好的字符串处理能力 尽管如此 我还是遇到了最简单的问题 我需要动态字符串 它们在运行时更改 因此字符串类型不是一个好的选择 因为它们是不可变的 所以我使用字符数组 设置起来有点痛苦 但至少它们是可以修改的 我想创建一个字符
  • 在Java中使用命令行编译多个包

    您好 我一直在使用 IDE 但现在我需要从命令行运行和编译 问题是我有多个软件包 我试图找到答案 但没有任何效果 所以我有 src Support java files Me java files Wrapers java files 你知
  • jvm 次要版本与编译器次要版本

    当运行使用具有相同主要版本但次要版本高于 JVM 的 JDK 编译的类时 JVM 会抛出异常吗 JDK 版本并不重要 类文件格式版本 http blogs oracle com darcy entry source target class
  • 使用 Java 在 WebDriver 中按 Ctrl+F5 刷新浏览器

    我已经使用 java 刷新了 WebDriver 中的浏览器 代码如下 driver navigate refresh 如何使用 Java 在 WebDriver 中按 Ctrl F5 来做到这一点 我认为您可以使用 WebDriver 和
  • 无法使用 json 架构验证器根据预定义的 yaml 文件验证查询参数

    我需要根据预定义的 yaml 文件架构验证查询参数的架构 因此我使用 json 架构验证器 验证如何失败 我正在执行以下步骤 填充参数和相应的架构 final List
  • Java 正则表达式 - 字母数字,最多一个连字符,句点或下划线,七个字符长

    我是 Java 正则表达式工具的新手 尽管它们潜力巨大 但我很难完成这项任务 我想编写一个正则表达式来验证遵循以下语法的输入字符串 小写字母和数字的任意组合 仅一个下划线 一个破折号或一个句号 无其他特殊字符 最小长度为 5 我想出了以下解
  • 有多少种方法可以将位图转换为字符串,反之亦然?

    在我的应用程序中 我想以字符串的形式将位图图像发送到服务器 我想知道有多少种方法可以将位图转换为字符串 现在我使用 Base64 格式进行编码和解码 它需要更多的内存 是否有其他可能性以不同的方式做同样的事情 从而消耗更少的内存 现在我正在
  • JFace ColumnWeigthData 导致父级增长

    我有一个 Eclipse RCP 应用程序 并且想要在TableViewer using ColumnWeigthData as ColumnLayoutData 问题是父表单 ScrolledForm在示例代码中 每当我布局表格时都会增加
  • 如何在 Eclipse 中使用其他外部 jar 依赖项创建不可运行/不可执行的 jar

    我无法通过 Eclipse 导出向导创建普通的 jar 不可运行 不可执行 它仅创建 jar 文件 但不会导出依赖的 jar 从而在从其他类调用导出的 jar 的方法时出现错误 请帮助 非常感谢 kurellajunior的建议 它是通过使
  • 发生错误。请参阅日志文件 - eclipse juno

    每当我启动 Eclipse Juno 时 都会出现错误 发生错误 请查看日志文件 C Program Files eclipse configuration 1362989254411 log 有的网站说卸载jdk重新安装 我这样做了 但没
  • 我想在java中使用XQuery进行Xml处理

    我想用XQuery用于从 java 中的 Xml 获取数据 但我没有得到需要为此添加哪个 Jar 我在谷歌上搜索了很多 但没有得到任何有用的例子 例如我得到以下链接 https docs oracle com database 121 AD
  • 创建正则表达式匹配数组

    在Java中 我试图将所有正则表达式匹配返回到一个数组 但似乎您只能检查模式是否匹配某些内容 布尔值 如何使用正则表达式匹配来形成与给定字符串中的正则表达式匹配的所有字符串的数组 4城堡的回答 https stackoverflow com
  • JavaFX - 为什么多次将节点添加到窗格或不同的窗格会导致错误?

    我现在正在学习基本的 JavaFX 我不明白我正在阅读的书中的这一说法 不 诸如文本字段之类的节点只能添加到一个窗格中一次 将节点添加到多次窗格或不同的窗格将导致运行时错误 我可以从书中提供的UML图看出它是一个组合 但我不明白为什么 库类
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • 如何初始化静态地图?

    你会如何初始化静态Map在Java中 方法一 静态初始化方法二 实例初始化 匿名子类 或者 还有其他方法吗 各自的优点和缺点是什么 这是说明这两种方法的示例 import java util HashMap import java util
  • 公共方法与公共 API

    在干净的代码书中 有一个观点是 公共 API 中的 Javadocs 同样 Effective java 一书也有这样的内容 项目 56 为所有公开的 API 元素编写文档注释 所以这就是我的问题 所有公共方法都被视为公共 API 吗 它们
  • 每次我们调用浏览器时,在 selenium 中使用 driver.manage().window().maximize() 是否好?

    We use driver manage window maximize 最大化浏览器 我在网上看到一些使用的例子driver manage window maximize 尽管不需要最大化浏览器 例如 gmail 登录 我还看到使用 se

随机推荐

  • 目标检测模型的评价指标 mAP

    在使用机器学习解决实际问题时 通常有很多模型可用 每个模型都有自己的怪癖 quirks 并且基于各种因素 性能会有所不同 模型性能的评定都是在某个数据集上进行的 通常这个数据集被称为 validation 或 test 数据集 模型性能的评
  • Java+Swing形成GUI图像界面

    一 Swing 简介 Swing 主要用来开发 GUI 程序 GUI Graphical User Interface 即图形用户界面 Java 中针对 GUI 设计提供了丰富的类库 这些类分别位于 java awt 和 java swin
  • Android高仿qq及微信底部菜单的几种实现方式

    文章目录 导航类型 第一种方式 侧滑菜单 底部导航 已经实现聊天 表情 图片 位置 语音等信息的发送 第二种方式 Fragment PopupWindow仿QQ空间最新版底部菜单栏 第三种方式 FragmentTabHost实现qq底部Ta
  • (一)在ubuntu20.04安装VPN服务

    很多时候需要从世界各地来访问公司服务器 电脑 工厂设备 实现方式有很多种 主要分为VPN和内网穿透方式 但是他们俩都存在一些问题 例如内网穿透主要利用外网IP 端口映射内网IP地址 端口方式 需要在设备端 电脑端装软件 例如frp方式需要在
  • 5.C++力扣刷题645

    题目 集合 s 包含从 1 到 n 的整数 不幸的是 因为数据错误 导致集合里面某一个数字复制了成了集合里面的另外一个数字的值 导致集合丢失了一个数字并且有一个数字重复 给定一个数组 nums 代表了集合 S 发生错误后的结果 请你找出重复
  • 最大子数组问题

    最大子数组问题 本文只是做一个记录 更细致的思路请查看算法导论 最大子数组结构体 typedef struct int low high sum SubArray 暴力求解 计算所有的数组区间的和进而得到最大的子数组 算法复杂度为 n 这种
  • TypeError: _open() got an unexpected keyword argument 'as_grey'

    报错 TypeError open got an unexpected keyword argument as grey 解决方法 把caffe io load imga读取图片改成cv2读取 image cv2 imread imageP
  • Windows设置IP地址

    控制面板里的 网络和共享中心 里可以设置IP地址 如果里面是 自动IP地址 电脑的IP地址就会 变化莫测 手动设置就可以避免这种问题 自动IP地址 其实这就是电脑的默认设置 修改路径如下 随便打开一个文件夹 gt 在地址栏输入 控制面板 g
  • kaggle入门(二)——Spaceship Titanic

    Spaceship Titanic KagglePredict which passengers are transported to an alternate dimensionhttps www kaggle com competiti
  • CentOS7安装Hadoop和zookeeper的各种问题

    最近在上大数据选修课 课程实验需要在CentOS上安装Hadoop和zookeeper 遇到了很多问题 记录一下 原本准备从头写个完整的安装记录的 发现我实在是不想回头看一遍了 就记录一些需要注意的地方吧 虚拟机的安装 安装过程按照书上面来
  • Java的Socket编程实例

    socket简介 套接字 socket 是一个抽象层 应用程序可以通过它发送或接收数据 可对其进行像对文件一样的打开 读写和关闭等操作 套接字允许应用程序将I O插入到网络中 并与网络中的其他应用程序进行通信 代码解释 Socket编程可以
  • c#应用程序的发布

    最近做了C 软件 发布给客户用时 发现客户运行不起来 原因是客户电脑上没有装Frame Work库 那么我们得把exe打包发布 发布的安装文件相当的大 以FrameWork为例达到了200多M 这就是为什么 有的应用程序只有1M多 可是发布
  • 聊聊项目测试时间不足怎么办

    这是鼎叔的第十九篇原创文章 行业大牛和刚毕业的小白 都可以进来聊聊 欢迎关注本人专栏和微信公众号 敏捷测试转型 大量原创思考文章陆续推出 4月23日周日 鼎叔返场再次参与小道消息播客 和主持人老徐和兔子继续畅谈 本文是返场直播的第一部分 重
  • 网络安全入门笔记(共327页),助你步入安全门槛

    前言 随着Web技术发展越来越成熟 而非Web服务越来越少的暴露在互联网上 现在互联网安全主要指的是Web安全 为了自身不 裸奔 在大数据里 渐渐开始学习Web安全 在学习Web安全的过程中 发现很大一部分知识点都相对零散 如果没有相对清晰
  • 【Pytorch】第 4 章 :时间差异和 Q 学习

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • 华为OD机试真题 Java 实现【简单的解压缩算法】【2023Q1 200分】,附详细解题思路

    一 题目描述 现需要实现一种算法 能将一组压缩字符串还原成原始字符串 还原规则如下 1 字符后面加数字N 表示重复字符N次 例如 压缩内容为A3 表示原始字符串为AAA 2 花括号中的字符串加数字N 表示花括号中的字符串重复N次 例如 压缩
  • 遍历Map集合的四种方式

    遍历map集合 package map import java util HashMap import java util Map public class Test01 public static void main String arg
  • jmeter 提取响应头中的set-Cookie

    性能测试时 有一个登录接口 登录成功后 会返回一个登录凭证token 但这个token不是在响应应答里面返回而是在响应头里面会返回一个set cookie 如下 HTTP 1 1 200 OK Server hsiar Date Fri 3
  • Unity打开Visual Stutio 2019很慢,项目工程打开失败

    最近遇见Unity打开VS Visual Stutio 很慢 并且还打不开源代码工程解决方案 1 出问题电脑的情况如下 1 1 Win7系统 1 2 安装了VS2019和VS2022两个版本 2 经过反复测试 发现如下操作会出现Unity打
  • 第二章:CompletableFuture

    Future接口理论知识复习 Future接口 FutureTask实现类 定义了操作异步任务执行一些方法 如获取异步任务的执行结果 取消任务的执行 判断任务是否被取消 判断任务执行是否完毕等 比如主线程让一个子线程去执行任务 子线程可能比