它是这个问题的延续:将多线程计算密集型作业移植到 Spark https://stackoverflow.com/questions/32276856/porting-a-multi-threaded-compute-intensive-job-to-spark
我在用forEachPartition
按照建议here https://stackoverflow.com/a/32277967/231917循环遍历 10000 个 ID 的列表,然后我执行repartition(20)
因为每个分区都会创建数据库连接,如果我创建 100 个分区,那么该作业就会因为 100 个与 postgres 和 mongo 的打开连接而终止。我使用 postgres 连接不仅可以存储数据,还可以从另一个表中查找一些数据。
我可以摆脱直接从我的任务将数据存储到 postgres 的情况,并将其作为序列文件的后处理。
但理想情况下,我需要大规模并行化我的 Spark 作业,以便任务在给定时间内完成,目前它在 20 小时内处理大约 200 个 ID,但我需要在 20 小时内处理 10000 个 ID。所以repartition(20)
显然没有帮助。我在这里受 db 上的 IO 约束。
那么我可以选择哪些选项来在所有任务中有效地共享这些数据呢?我希望将 mongo 和 postgres 中的数据视为内存查找表中的数据 - 总大小约为 500GB。
我的选择是:
- RDD(我认为 RDD 不适合我的用例)
- 数据框
- 广播变量(不确定这是否有效,因为它的创建需要 Spark 驱动程序中有 500GB 可用空间)
- 将数据从 mongodb 移动到 s3 并从 s3 查找任务。
我们解决此类问题所遵循的技术是:
- 将查找存储在 MongoDB 的不同集合中。
- 使用 Hadoop MongoDB 连接器从 MongoDB 获取数据并将其存储在 RDD 中
- 广播变量,以便所有节点/工作人员都可以使用它
- 现在,如果数据位于 HDFS 中,则为其创建一个 RDD,或者如果数据位于 MongoDB 中,则使用 Hadoop MongoDB 连接器。
- 现在执行查找匹配部分
- 将文件保存为序列文件,或者您也可以将其保存在 S3 上,需要在我们将其存储回 MongoDB 时检查它
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)