假设我有两个大型 RDD,A 和 B,包含键值对。我想使用密钥连接 A 和 B,但是在匹配的 (a,b) 对中,我只想要一小部分“好”的。所以我进行连接并随后应用过滤器:
A.join(B).filter(isGoodPair)
where isGoodPair
是一个布尔函数,它告诉我一对 (a,b) 是否良好。
为了很好地扩展,Spark 的调度程序最好避免在A.join(B)
明确地。即使在大规模分布式的基础上,这也可能导致耗时的磁盘溢出,甚至耗尽某些节点上的所有内存和磁盘资源。为了避免这种情况,Spark 应该在每个分区内生成对 (a,b) 时应用过滤器。
我的问题:
- Spark 真的这样做吗?
- 其架构的哪些方面可以实现或阻止所需的行为?
- 我应该使用
cogroup
反而?在 PySpark 中,它返回一个迭代器,因此我可以将过滤器应用于迭代器,对吧?
我在 PySpark shell(运行 Spark 1.2.1)中进行了一个实验来回答这些问题。结论如下:
- 不幸的是,Spark 确实not当连接生成对时应用过滤器。它在继续过滤连接对之前显式生成整个连接对集。
- 这可能是因为 Spark 一次运行一次 RDD 转换。它通常无法执行这种微妙的链接优化。
- 通过使用
cogroup
代替join
,我们可以手动实现想要的优化。
实验
我制作了一个包含 100 个组的 RDD,每个组包含 1 到 10,000 的整数,并且在每个组中我计算了最多相距 1 的整数的数量:
import itertools as it
g = int(1e2) # number of groups
n = int(1e4) # number of integers in each group
nPart = 32 # standard partitioning: 8 cores, 4 partitions per core
A = sc.parallelize(list(it.product(xrange(g),xrange(n))),nPart)
def joinAndFilter(A):
return A.join(A).filter(lambda (k,(x1,x2)): abs(x1 - x2) <= 1)
def cogroupAndFilter(A):
def fun(xs):
k,(xs1,xs2) = xs
return [(x1,x2) for (x1,x2) in it.product(xs1,xs2) if abs(x1 - x2) <= 1]
return A.cogroup(A).flatMap(fun)
cogroupAndFilter(A).count()
joinAndFilter(A).count()
我没有简单的方法来分析代码,所以我只是在我的 Mac 上的“活动监视器”中观察它的运行情况:
当我使用时,内存使用量激增joinAndFilter
,大概是因为它在应用相差一过滤器之前生成了所有对。事实上,我不得不杀死 PySpark,因为它耗尽了我所有的内存,并且即将导致系统崩溃。和cogroupAndFilter
,这些对在生成时就被过滤,因此内存保持在控制之下。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)