purpose:
我想加载流数据,然后添加一个键,然后按键计数。
problem:
当我尝试使用流方法(无界数据)加载和按键分组大数据时,Apache Beam Dataflow pipline 出现内存错误
。因为看起来数据是在 group-by 中累积的,并且它不会在触发每个窗口时更早地触发数据。
如果我减小元素大小(元素计数不会改变),它就会起作用!因为实际上分组会等待所有数据被分组,然后触发所有新的窗口数据。
我测试了两者:
beam 版本 2.11.0 和 scio 版本 0.7.4
beam 版本 2.6.0 和 scio 版本 0.6.1
重新生成错误的方法:
- 读取包含文件名的 Pubsub 消息
- 作为逐行迭代器从 GCS 读取并加载相关文件
- 逐行展平(因此生成大约 10,000 个)元素
- 向元素添加时间戳(当前即时时间)
- 创建我的数据的键值(使用 1 到 10 的一些随机整数键)
- 应用带触发的窗口(行数较小且无内存问题时会触发约 50 次)
- 每个键计数(按键分组然后将它们组合)
- 最后,我们应该有大约 50 * 10 的元素来表示按窗口和键的计数(当行大小足够小时测试成功)
管道的可视化(步骤 4 至 7):
按键分组步骤摘要:
正如您所看到的,数据是逐组累积的,并且不会被发出。
窗口代码在这里:
val windowedData = data.applyKvTransform(
Window.into[myt](
Sessions.withGapDuration(Duration.millis(1)))
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))
).orFinally(AfterWatermark.pastEndOfWindow())
).withAllowedLateness(Duration.standardSeconds(100))
.discardingFiredPanes()
)
错误:
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
是否有任何解决方案可以通过强制 group-by 发出每个窗口的早期结果来解决内存问题。
KeyCommitTooLargeException不是内存问题,而是protobuf序列化问题。 Protobuf 的对象限制为 2GB(谷歌protobuf最大大小 https://stackoverflow.com/questions/34128872/google-protobuf-maximum-size)。 Dataflow 发现管道中单个键的值大于 2GB,因此无法对数据进行混洗。错误消息指出“这可能是由于在不使用合并的情况下将大量数据分组到单个窗口中,或者从单个输入元素生成大量数据造成的。”根据您的管道设置(即分配的随机密钥),更有可能是后者。
管道可能已从 GCS 读取大文件(>2GB)并将其分配给随机密钥。 GroupByKey 需要一个密钥洗牌操作,而 Dataflow 由于 protobuf 限制而无法执行,因此卡在该密钥上并保留水印。
如果单个键的值很大,您可能需要减小值的大小,例如压缩字符串,或者将字符串拆分为多个键,或者首先生成较小的 GCS 文件。
如果大值来自多个键的分组,您可能需要增加键空间,以便每个按键分组操作最终将更少的键分组在一起。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)