处理数据流中一对多阶段的正确方法

2024-02-03

我有一个(Java)批处理管道,它遵循以下模式:

(FileIO)
(ExtractText > input=1 file, output=millions of lines of text)
(ProcessData)

ProcessData 阶段包含缓慢的部分(将数据与大白名单匹配),并且需要在多个工作线程上进行扩展,这应该不是问题,因为它只包含 DoFns。然而,我的一对多阶段似乎强制所有输出仅由一名工作人员处理(实例化更多工作人员会使它们除一名工作人员外全部闲置,或者如果启用自动缩放,则缩小规模)。

基于其他 stackoverflow 条目,我尝试通过Reshuffle.viaRandomKey()。这不起作用,因为Reshuffle包含一个GroupByKey它将所有结果加载到内存中,导致 OOM,即使我预先通过Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))

另一种选择是创建一个 CustomSource 来替换前两个阶段,但我发现这种方法不够充分,因为 1) 自定义源的文档严重缺乏 2) 需要更多的时间和代码来实现 3) 这种一对多在管道中间很可能会遇到问题,我无法创建自定义源。

我应该如何处理数据流管道中的一对多阶段?


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

处理数据流中一对多阶段的正确方法 的相关文章

随机推荐

  • 如何处理 AWS Athena 中的嵌入换行符

    我在 AWS Athena 中创建了一个表 如下所示 CREATE EXTERNAL TABLE IF NOT EXISTS default test line breaks col1 string col2 string ROW FORM
  • 是否可以训练斯坦福 NER 系统来识别更多命名实体类型?

    我现在正在使用一些 NLP 库 stanford 和 nltk 斯坦福大学我看到了演示部分 但只是想问是否可以使用它来识别更多实体类型 因此 目前斯坦福的 NER 系统 如演示所示 可以将实体识别为人 名称 组织或位置 但认可的组织仅限于大
  • 从给定节点开始有向图的 BFS 遍历

    我的基本理解广度优先搜索图的遍历是 BFS Start from any node Add it to queue Add it to visited array While queue is not empty Remove head f
  • 添加用于将 Word 注释中的标题提取到 Excel 中的代码

    我有一些代码用于将 Word 中的注释提取到 Excel 中 但是 它只提取一级标题 直接标题 我可以添加哪些代码来提取 Excel 中不同列中的不同标题级别 我可以按样式选择这些不同的标题级别吗 如果我使用 MyOwnHeading 样式
  • Maven:在 pom.xml 中配置并行构建

    Maven 具有执行并行构建的能力 https cwiki apache org confluence display MAVEN Parallel builds in Maven 3 https cwiki apache org conf
  • jQuery 支持“:invalid”选择器

    我收到以下控制台消息 16 04 01 292 Error Syntax error unrecognized expression unsupported pseudo invalid http localhost 8080 assets
  • JHipster:将根域重定向到 www

    我正在从事搜索引擎优化工作 我想https pomzen com https pomzen com被重定向到https www pomzen com https www pomzen com 是否可以在 JHipster 项目中完成 还是在
  • 防止表单提交后重新加载页面

    有没有办法检测并停止页面是否正在重新加载 我有一个页面 在成功提交其中存在的表单后正在重新加载 我想要一个事件侦听器来查看页面是否正在重新加载并应该阻止它重新加载 我不能return false 成功提交注册表 在你的html中
  • 当 Svelte 重用父 dom 元素时如何确保仅本地转换

    在 Svelte 中 我有一个组件用于显示两个不同列表中的项目 当这些项目从一个列表移动到另一个列表时 它们使用过渡来动画进入或退出 不过 我还有一种方法可以过滤屏幕上显示的内容 显示一组新的项目将使用相同的组件 但具有不同的数据 在这种情
  • 在 ASMX 中测试自定义 SOAP 标头

    ASMX生成的测试表单对于测试操作来说非常方便 然而 没有明显的方法来包含 SOAP 标头 如何在不编写客户端程序来使用该服务的情况下测试标头 如果您关心互操作性 请不要使用 net 客户端应用程序来测试 net Web 服务 使用 SOA
  • asp.net-mvc 在后期操作中获取字典或如何将 FormCollection 转换为字典

    任何人都知道如何改变FormCollection into a IDictionary或者如何获得IDictionary在后期行动中 这只是 Omnu 代码的等价物 但对我来说似乎更优雅 Dictionary
  • 如何重新加载当前状态?

    我正在使用 Angular UI Router 并且想要重新加载当前状态并刷新所有数据 重新运行当前状态及其父级的控制器 我有 3 个州级别 目录 组织 详细信息 目录 组织包含一个包含组织列表的表 单击表中的项目加载目录 组织 详细信息使
  • Cordova - 如何不出现闪屏?

    我不希望我的 Cordova 项目 Android 和 iOS 出现启动屏幕 如何删除它 我尝试禁用启动画面插件 但它仍然出现 怎么解决
  • 安装Oracle表单并出现错误。无法启动安装程序 (555)

    甲骨文形式下载地址 http www oracle com technetwork developer tools forms downloads index html http www oracle com technetwork dev
  • Json.Net布尔解析问题

    JObject Parse jsonString 导致布尔数据出现问题 例如json 是 BoolParam true 我用下面的代码来解析 JObject data JObject Parse str1 foreach var x in
  • Python 拼凑将 UCS-2 (UTF-16?) 读取为 ASCII

    我对这个问题有点不知所措 所以请提前原谅我的术语 我在 Windows XP 上使用 Python 2 7 运行它 我发现一些 Python 代码可以读取日志文件 执行一些操作 然后显示一些内容 什么 这还不够详细吗 好的 这是一个简化版本
  • falcon python 中的数据传递应用程序

    在提出问题之前 我想提一下 我知道我可以使用 django 来制作应用程序 但我需要使用 falcon 而不是其他 我只是在寻找一种方法 让我们看一个非常简单的场景 以便我可以理解数据如何在应用程序的各个部分之间流动 我有一个使用 html
  • 删除所有重复的行,包括“参考”行[重复]

    这个问题在这里已经有答案了 我正在寻找一种方法来从向量中删除所有重复元素 包括参考元素 经过参考元素 我的意思是当前在比较中使用的元素 以搜索其重复项 例如 如果我们考虑这个向量 a c 1 2 3 3 4 5 6 7 7 8 我想获得 b
  • grpc/protobuffer 请求特定字段

    GraphQL 允许您请求特定字段 响应仅包含您请求的字段 例如 graphql 查询如下 hero name 将返回 data hero name R2 D2 作为 graphQl 查询 例如 hero name friends name
  • 处理数据流中一对多阶段的正确方法

    我有一个 Java 批处理管道 它遵循以下模式 FileIO ExtractText gt input 1 file output millions of lines of text ProcessData ProcessData 阶段包含