根据 dataflow 2.X 的发行说明,IntraBundleParallelization 已被删除。有没有办法控制/增加数据流 2.1.0 上 DoFns 的并行度?
当我在 1.9.0 版本的数据流上使用 IntrabundleParallelization 时,我获得了更好的性能。
它被删除是因为它的实现保留了对ProcessContext
of a ProcessElement
调用完成后再调用,这是不安全的并且不能保证有效。
然而,我同意这是一个有用的抽象,不幸的是我们还没有替代品。
作为解决方法,您可以尝试以下操作:
- 在你的DoFn中
@Setup
,创建一个Executor
与所需的线程数
- 在你的DoFn中
@StartBundle
,创建一个ExecutorCompletionService
包裹执行者
- In
@ProcessElement
,提交一个Future
代表处理该元素的结果
- In
@ProcessElement
, also poll()
the CompletionService
完成期货并输出结果
- In
@FinishBundle
,等待所有剩余的 future 完成,输出结果,并关闭CompletionService
.
记得not使用ProcessContext
在你的未来。ProcessContext
只能在当前线程和当前线程内使用ProcessElement
call.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)