以下代码片段是获取目录列表、对每个文件调用提取方法并将生成的药物对象序列化为 xml 的方法的一部分。
try(Stream<Path> paths = Files.list(infoDir)) {
paths
.parallel()
.map(this::extract)
.forEachOrdered(drug -> {
try {
marshaller.write(drug);
} catch (JAXBException ex) {
ex.printStackTrace();
}
});
}
这是完全相同的代码,执行完全相同的操作,但使用普通的.list()
调用以获取目录列表并调用.parallelStream()
在结果列表中。
Arrays.asList(infoDir.toFile().list())
.parallelStream()
.map(f -> infoDir.resolve(f))
.map(this::extract)
.forEachOrdered(drug -> {
try {
marshaller.write(drug);
} catch (JAXBException ex) {
ex.printStackTrace();
}
});
我的机器是四核 MacBook Pro,Java v 1.8.0_60(内部版本 1.8.0_60-b27)。
我正在处理约 7000 个文件。 3次运行的平均值:
第一个版本:
和.parallel()
:20秒。没有.parallel()
:41秒
第二个版本:
和.parallelStream()
:12秒。和.stream()
:41秒。
并行模式下的那 8 秒似乎是一个巨大的差异,因为extract
从流中读取并完成所有繁重工作的方法write
进行最终写入的调用没有改变。
问题是 Stream API 的当前实现以及IteratorSpliterator
对于未知大小的源,严重地将此类源拆分为并行任务。您很幸运拥有超过 1024 个文件,否则您将根本没有并行化的好处。当前的 Stream API 实现考虑了estimateSize()
返回值来自Spliterator
. The IteratorSpliterator
未知大小的回报Long.MAX_VALUE
在 split 之前,其后缀总是返回Long.MAX_VALUE
以及。其分裂策略如下:
- 定义当前批量大小。当前公式是从 1024 个元素开始,然后按算术增加(2048、3072、4096、5120 等),直到
MAX_BATCH
已达到大小(即 33554432 个元素)。
- 将输入元素(在您的情况下为路径)消耗到数组中,直到达到批量大小或输入耗尽。
- 返回一个
ArraySpliterator
迭代创建的数组作为前缀,将其自身作为后缀。
假设您有 7000 个文件。 Stream API 要求估计大小,IteratorSpliterator
回报Long.MAX_VALUE
。好的,Stream API 询问IteratorSpliterator
为了分割,它从底层收集 1024 个元素DirectoryStream
到数组并拆分为ArraySpliterator
(估计大小 1024)及其本身(估计大小仍然是Long.MAX_VALUE
). As Long.MAX_VALUE
远大于 1024,Stream API 决定继续分割较大的部分,甚至不尝试分割较小的部分。所以整体的分裂树是这样的:
IteratorSpliterator (est. MAX_VALUE elements)
| |
ArraySpliterator (est. 1024 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 2048 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 3072 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 856 elements) IteratorSpliterator (est. MAX_VALUE elements)
|
(split returns null: refuses to split anymore)
因此,之后有 5 个并行任务需要执行:实际上包含 1024、2048、3072、856 和 0 个元素。请注意,即使最后一个块有 0 个元素,它仍然报告它估计有Long.MAX_VALUE
元素,因此 Stream API 会将其发送到ForkJoinPool
以及。不好的是 Stream API 认为进一步分割前四个任务是没有用的,因为它们的估计大小要小得多。因此,您得到的输入分割非常不均匀,最多使用四个 CPU 核心(即使您有更多核心)。如果每个元素的处理对于任何元素都花费大致相同的时间,那么整个过程将等待最大的部分(3072 个元素)完成。因此,您可能获得的最大加速是 7000/3072=2.28 倍。因此,如果顺序处理需要 41 秒,那么并行流将花费大约 41/2.28 = 18 秒(接近实际数字)。
您的解决方案完全没问题。请注意,使用Files.list().parallel()
你也有所有的输入Path
存储在内存中的元素(在ArraySpliterator
对象)。因此,如果您手动将它们转储到List
。数组支持的列表实现,例如ArrayList
(目前由Collectors.toList()
)可以毫无问题地均匀分割,从而带来额外的加速。
为什么这样的情况没有优化呢?当然,这不是不可能的问题(尽管实现可能相当棘手)。对于 JDK 开发人员来说,这似乎并不是一个优先考虑的问题。邮件列表中对此主题进行了多次讨论。您可以阅读 Paul Sandoz 的留言here http://mail.openjdk.java.net/pipermail/core-libs-dev/2015-July/034539.html他对我的优化工作发表了评论。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)