JAVA-- 在Java8 Parallel Stream中如何自定义线程池?

2023-05-16

使用Parallel Stream时,在适当的环境中,通过适当地使用并行度级别,可以在某些情况下获得性能提升。
如果程序创建一个自定义ThreadPool,必须记住调用它的shutdown()方法来避免内存泄漏。

Parallel Stream默认使用的线程池

如下代码示例,Parallel Stream并行处理使用的线程池是ForkJoinPool.commonPool(),这个线程池是由整个应用程序共享的线程池

@Test
    public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
        List<Long> aList = new ArrayList<>();
        Stream<Long> parallelStream = aList.parallelStream();

        assertTrue(parallelStream.isParallel());
    }

如何自定义线程池

简单示例

如下代码示例说明如下:

  • 使用的ForkJoinPool构造函数的并行级别为4。为了确定不同环境下的最佳值,需要进行一些实验,但一个好的经验法则是根据CPU的核数选择数值。
  • 接下来,处理并行流的内容,在reduce调用中对它们进行汇总。

这个简单的示例可能不能充分说明使用自定义线程池的用处,但是在不希望将公共线程池与长时间运行的任务绑定在一起(例如处理来自网络源的数据)或应用程序中的其他组件正在使用公共线程池的情况下,其好处就很明显了。

    @Test
    public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
        throws InterruptedException, ExecutionException {

        long firstNum = 1;
        long lastNum = 1_000_000;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
            .collect(Collectors.toList());

        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        long actualTotal = customThreadPool.submit(
            () -> aList.parallelStream().reduce(0L, Long::sum)).get();

        assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
    }

复杂点的示例

通过查看日志,可以看到使用了自定义的线程池,提高了并发处理的效率

	@Test
    public void testCustomThreadPool() throws ExecutionException, InterruptedException {
        List<Long> firstRange = LongStream.rangeClosed(1, 10).boxed()
            .collect(Collectors.toList());

        List<Long> secondRange = LongStream.rangeClosed(5000, 6000).boxed()
            .collect(Collectors.toList());

        ForkJoinPool forkJoinPool = new ForkJoinPool(3);
        Future<Long> future = forkJoinPool.submit(() -> {
            return firstRange.parallelStream().map((number) -> {
                try {
                    print(Thread.currentThread().getName() +" 正在处理 "+number);
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                }finally {
                    return number;
                }
            }).reduce(0L, Long::sum);
        });
        assertEquals((1 + 10) * 10 / 2, future.get());

        forkJoinPool.shutdown();

        ForkJoinPool forkJoinPool2 = new ForkJoinPool(10);

        forkJoinPool2.submit(() -> {
            secondRange.parallelStream().forEach((number) -> {
                try {
                    print(Thread.currentThread().getName() +" 正在处理 "+number);
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                }
            });
        });
        forkJoinPool2.shutdown();
        TimeUnit.SECONDS.sleep(2);
    }
    
 	private static void print(String msg){
        System.out.println(msg);
    }

求质数

实现方案有如下2种:

  • 将Parallel task 直接提交给自定义的ForkJoinPool中
  • 将自定义线程池传递到完整的future.supplyAsync方法中
public class StreamTest {
    @Test
    public void testCompletableFuture()throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(2);
        CompletableFuture<List<Long>> primes = CompletableFuture.supplyAsync(() ->
                //parallel task here, for example
                range(1, 1_000_000).parallel().filter(StreamTest::isPrime).boxed().collect(toList()),
            forkJoinPool
        );
         forkJoinPool.shutdown();
        System.out.println(primes.get());
    }

    @Test
    public void testCustomForkJoinPool() throws InterruptedException {
        final int parallelism = 4;
        ForkJoinPool forkJoinPool = null;
        try {
            forkJoinPool = new ForkJoinPool(parallelism);
            final List<Integer> primes = forkJoinPool.submit(() ->
                // Parallel task here, for example
                IntStream.range(1, 1_000_000).parallel()
                    .filter(StreamTest::isPrime)
                    .boxed().collect(Collectors.toList())
            ).get();
            System.out.println(primes);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }

    private static void print(String msg){
        System.out.println(msg);
    }
}

注意事项:小心内存泄漏【Memory Leak】

正如前面所讨论的,整个应用程序默认使用公共线程池。公共线程池是一个静态ThreadPool实例。
因此,如果使用默认线程池,就不会发生内存泄漏。

但是针对使用自定义线程池的场景下,customThreadPool对象不会被解引用和垃圾收集——相反,它将等待分配新任务【the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned】
也就是说,每次调用测试方法时,都会创建一个新的customThreadPool对象,并且它不会被释放。

解决这个问题很简单:在执行了这个方法之后关闭customThreadPool对象:

@Test
    public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
        throws InterruptedException, ExecutionException {

        long firstNum = 1;
        long lastNum = 1_000_000;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
            .collect(Collectors.toList());
        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        try {
            long actualTotal = customThreadPool.submit(
                () -> aList.parallelStream().reduce(0L, Long::sum)).get();

            assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
        }finally {
            customThreadPool.shutdown();
        }
    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

JAVA-- 在Java8 Parallel Stream中如何自定义线程池? 的相关文章

  • 如何在 IDEA Intellij 上使用 Spring-boot 自动重新加载

    我写了一个基于Spring boot tomcat freemarker的项目 我运行成功 但是每当我修改一些模板和java类时 我必须重新启动服务器或使用Intellij上的 重新加载更改的类 菜单才能使更改生效 浪费很多时间 然后我尝试
  • 在 Java 中使用 Batik 检查和删除 SVG 中的属性

    这个问题基本上说明了一切 如何检查 SVG 是否具有 viewBox 属性 我正在使用蜡染库 我需要这个 因为我需要 至少 通知用户有一个 viewBox 属性 我可以删除它吗 使用 org w3c dom 类 您可以按照以下方式做一些事情
  • 在 jTextfield 中禁用“粘贴”

    我有一个用 Swing awt 编写的应用程序 我想阻止用户将值粘贴到文本字段中 有没有办法在不使用动作监听器的情况下做到这一点 您可以使用 null 参数调用 setTransferHandler 如下所示 textComponent s
  • 使用 Spring 控制器处理错误 404

    I use ExceptionHandler处理我的网络应用程序抛出的异常 在我的例子中我的应用程序返回JSON回应HTTP status用于对客户端的错误响应 但是 我正在尝试弄清楚如何处理error 404返回与处理的类似的 JSON
  • 即使在轴上进行自动量程调整,我也可以保留积分刻度线吗?

    我 偷 了一些代码here http fxexperience com 2012 01 curve fitting and styling areachart 拥有一个AreaChart我在 FXML 中使用了 平滑线条 它的工作原理如下
  • 具有 JPA 持久性的 Spring 状态机 - 存储库使用

    我试图弄清楚如何轻松使用 Spring 状态机 包括使用 JPA 进行持久化 这是我正在处理的问题 不兼容的数据类型 工厂和持久性 在程序的某个时刻 我想使用连接到用户的状态机 有用于此目的的存储库 项目spring statemachin
  • 在尝试使用 GPS 之前如何检查 GPS 是否已启用

    我有以下代码 但效果不好 因为有时 GPS 需要很长时间 我该如何执行以下操作 检查GPS是否启用 如果启用了 GPS 请使用 GPS 否则请使用网络提供商 如果 GPS 时间超过 30 秒 请使用网络 我可以使用时间或 Thread sl
  • 拆分/标记化/扫描字符串并注意引号

    Java中是否有默认 简单的方法来分割字符串 但要注意引号或其他符号 例如 给定以下文本 There s a man that live next door in my neighborhood and he gets me down Ob
  • 在 Java 中创建 XML 文件的最佳方法是什么?

    我们目前使用 dom4j 来创建 XML 文件 不过 我猜现在有更好的东西了 如果我们使用的是 Java 1 6 或更高版本 那么在编写 XML 文件时最好使用什么类 运行速度最快 使用简单 我不需要构建一个 DOM 然后编写整个 DOM
  • 在 java 中运行外部应用程序但不要等待它完成

    我正在用java编写一个应用程序 允许我运行其他应用程序 为此 我使用了 Process 类对象 但当我这样做时 应用程序会等待进程结束 然后再退出 有没有办法在 Java 中运行外部应用程序 但不等待它完成 public static v
  • 嵌套字段的 Comparator.comparing(...)

    假设我有一个这样的域模型 class Lecture Course course getters class Course Teacher teacher int studentSize getters class Teacher int
  • 如何在不反编译的情况下更改已编译的.class文件?

    我想更改 class 文件方法 我安装 JD Eclipse Decompiler 并打开 class 文件 我添加了一些代码并保存 class 文件 但是 class 文件没有改变 我不知道如何使用反编译器 如果可能的话 如何在不使用反编
  • Java 中 JButton 的击键/热键

    最初我使用 JMenu 并建立热键以使用加速器工作 它运行得很好 现在我想在 JButton 中实现相同的行为 但我陷入困境 这是我编写的代码 请分享您的想法 以便我可以走上正确的道路 import javax swing import j
  • 使用单独的线程在java中读取和写入文件

    我创建了两个线程并修改了 run 函数 以便一个线程读取一行 另一个线程将同一行写入新文件 这种情况会发生直到整个文件被复制为止 我遇到的问题是 即使我使用变量来控制线程一一执行 但线程的执行仍然不均匀 即一个线程执行多次 然后控制权转移
  • 如何减去两个 XmlGregorianCalendar 对象来创建一个 Duration 对象?

    我想计算两个时间之间的差值XmlGregorianCalendar对象 从而创建一个Duration object 但我还没有找到执行减法的干净方法 你会怎么做 那应该是 DatatypeFactory newDuration xgc2 t
  • 让 Hibernate 和 SQL Server 与 VARCHAR 和 NVARCHAR 良好配合

    我目前正在大型数据库的某些表中启用 UTF 8 字符 这些表已经是 MS SQL 类型 NVARCHAR 此外 我还有几个使用 VARCHAR 的字段 Hibernate 与 JDBC 驱动程序的交互存在一个众所周知的问题 例如 参见在 h
  • Axis2 错误:要输出的文本中的空白字符 (0x4) 无效

    我创建了一个 Java 客户端 使用 Axis2 1 7 6 作为代码生成器与 SOAP Web 服务进行交互 问题在于客户端的某些输入抛出异常并显示以下消息 org apache axis2 AxisFault Invalid white
  • spring data jpa复合键重复键记录插入导致更新

    我有一个具有复合键的实体 我试图通过使用 spring data jpa 存储库到 mysql 数据库来持久化它 如下所示 Embeddable public class MobileVerificationKey implements S
  • 在实现使用原始类型的接口时如何避免警告?

    我正在实施流程工厂 http help eclipse org ganymede index jsp topic org eclipse platform doc isv reference api org eclipse debug co
  • 受信任的 1.5 小程序可以执行系统命令吗?

    如果是的话 这个能力有什么限制吗 具体来说 我需要以 Mac OSX 为目标 我以前用过这个在 Windows 系统上启动东西 但从未在 Mac 上尝试过 public void launchScript String args Strin

随机推荐