Dataflow 2.1.0 中是否有 IntrabundleParallelization 的替代方案?

2024-04-19

根据 dataflow 2.X 的发行说明,IntraBundleParallelization 已被删除。有没有办法控制/增加数据流 2.1.0 上 DoFns 的并行度?

当我在 1.9.0 版本的数据流上使用 IntrabundleParallelization 时,我获得了更好的性能。


它被删除是因为它的实现保留了对ProcessContext of a ProcessElement调用完成后再调用,这是不安全的并且不能保证有效。

然而,我同意这是一个有用的抽象,不幸的是我们还没有替代品。

作为解决方法,您可以尝试以下操作:

  • 在你的DoFn中@Setup,创建一个Executor与所需的线程数
  • 在你的DoFn中@StartBundle,创建一个ExecutorCompletionService包裹执行者
  • In @ProcessElement,提交一个Future代表处理该元素的结果
  • In @ProcessElement, also poll() the CompletionService完成期货并输出结果
  • In @FinishBundle,等待所有剩余的 future 完成,输出结果,并关闭CompletionService.

记得not使用ProcessContext在你的未来。ProcessContext只能在当前线程和当前线程内使用ProcessElement call.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Dataflow 2.1.0 中是否有 IntrabundleParallelization 的替代方案? 的相关文章

随机推荐

  • 在SURF中使用欧几里德距离

    在我的代码中 我根据最近邻距离比过滤好图像 如下所示 for int i 0 i lt min des image rows 1 int matches size i if matches i 0 distance lt 0 6 match
  • 如何杀死 Linux 中特定端口上运行的进程?

    我尝试使用关闭tomcat shutdown sh来自雄猫 bin目录 但发现服务器没有正常关闭 因此我无法重新启动我的 tomcat 正在端口上运行8080 我想杀死正在运行的tomcat进程8080 我首先想要获得在特定端口 8080
  • 为什么我不能将反向迭代器转换为正向迭代器?

    嗯 我知道为什么 是因为没有转化 但是为什么没有转化呢 为什么正向迭代器可以变成反向迭代器 但反之则不行 更重要的是 如果我想这样做 我该怎么办 是否有一些适配器允许您使用前向迭代器向后迭代 std vector
  • 在 Android 中隐藏 RadioButton

    我想将 RadioButton 的可见性设置为 INVISIBLE 或 GONE 由于某种原因 这不起作用 RadioButton myRadioButton RadioButton findViewById R id my radio b
  • 没有什么可以帮助解决 Qt 中的“对 vtable 的未定义引用”

    我无法构建此错误 未定义对 CustomUndoStack 的 vtable 的引用 这是代码 class CustomUndoStack public QObject Q OBJECT public int main int argc c
  • ANSI SQL 问题 - 如果记录已存在,如何插入或更新记录?

    虽然我 目前 使用 mySQL 但我不需要任何数据库特定的 SQL 我试图插入一条记录 如果不存在 并更新一个字段 如果存在 我想使用 ANSI SQL 该表看起来像这样 create table test table id int nam
  • Greasemonkey:XPath 未返回 .xhtml 页面的结果

    在 xhtml 页面上运行时 xpath 似乎没有返回任何结果 var result document evaluate a img document body null XPathResult ORDERED NODE SNAPSHOT
  • 在 Windows Azure 上使用quartz.net

    我在我的 asp net 应用程序中使用quartz net 当我在本地主机上运行时 quartz 调度程序可以工作 但是当我将站点发布到 windows azure 时 它 不再工作 任何帮助将非常感激 问题不在于它没有在天蓝色站点上运行
  • /etc/fstab 中的动态 IP

    我正在尝试在 AWS 中使用弹性文件系统 EFS 我的目标是使用 etc fstab 自动挂载它 由于 EC2 实例会跨可用区域自动扩展 因此 EFS 挂载 IP 会根据实例的区域而变化 目前AWS提供了这个命令来将其安装到正确的区域 su
  • C++/CLI:#pragma 托管/非托管范围

    我有一个混合模式 DLL 其中有一个包含托管和非托管代码的 cpp 文件 一个简化的重现示例如下所示 include stdafx h pragma managed Just for explicitness doesn t influen
  • 如何从 launch.json 传递带有特殊字符的参数?

    我试图通过 launch json 将参数传递给我的 Python 程序 并且我的参数之一需要特殊字符 因为它是密码 我计划添加更安全的方式来输入密码 但这不是重点 这是我的 launch json 密码已更改 但仍带有特殊字符 Use I
  • 通过 Paramiko SSH 的 SQLAlchemy

    我的服务器上有一个数据库 我需要通过 SSH 访问该数据库 现在我通过使用命令行来获取数据来处理数据库 import paramiko ssh paramiko SSHClient ssh set missing host key poli
  • Mockito 并不是在嘲笑电话

    我正在测试的功能之一是通过 ssh 登录到一台机器 我想模拟 ping 方法 它实际上尝试 ssh 进入机器 因为我并没有真正 ssh 进入机器 我正在测试的类 public class TestMachine public int pin
  • 如何阻止UITextView进入时向上滚动

    我有一个UITextView包含在一个UITableViewCell 最初显示视图时布局是正确的 但是一旦我单击UITextView它会自动向上滚动一点 第一行的上半部分字符变得不可见 这张图是当UITextView不活跃 UITextVi
  • NSDateFormatter 获取 Null

    我在我的项目中得到一个字符串 我将字符串传递给 dateFormatter 但我得到 nil 在字符串中而不是IST我可能会得到PDT我可能会得到任何其他格式 NSString currentDateString 2012 11 09 12
  • 分割scrapy的大CSV文件

    是否可以使 scrapy 写入每个不超过 5000 行的 CSV 文件 我怎样才能给它一个自定义的命名方案 我应该修改吗CsvItemExporter 尝试这个管道 coding utf 8 Define your item pipelin
  • “setIndentationLeft”不是“iTextsharp.text.Paragraph”的成员

    我正在添加出现错误的段落 p setindentationLeft 不是 itextsharp text paragraph 的成员 Dim bf As BaseFont BaseFont CreateFont Dim p As New P
  • MVC WebGrid 是开源的吗?

    MVC WebGrid 是开源的吗 如果可以的话哪里可以找到源代码 当前版本的 WebGrid 几乎没有文档 实现不完整等 我正在尝试做一件简单的事情 即向视图中的 WebGrid Column 添加可见性属性 但这是不可能的 您可以下载A
  • 如何在字符串中使用变量

    我正在尝试在字符串中实现变量 我已经搜索过这个并尝试向它扔不同的东西 但似乎没有任何效果 我有一个如下所示的变量 http localhost 8080 Editor name Default 我使用以下方法获取值没有问题 function
  • Dataflow 2.1.0 中是否有 IntrabundleParallelization 的替代方案?

    根据 dataflow 2 X 的发行说明 IntraBundleParallelization 已被删除 有没有办法控制 增加数据流 2 1 0 上 DoFns 的并行度 当我在 1 9 0 版本的数据流上使用 IntrabundlePa