当我在 scala 中使用全局映射变量而不广播时会发生什么

2024-06-22

在 scala 中,当我在 scala 中使用全局映射变量而不进行广播时会发生什么?

例如。如果我使用变量collect*(例如collectAsMap),看来它是一个全局变量,我可以在所有地方使用它RDD.mapValues()函数无需显式广播它。

但我知道 Spark 是分布式工作的,它不应该能够在不广播全局内存存储变量的情况下处理它。所以发生了什么事?

代码示例(此代码在文本中调用 tf-idf,其中 df 存储在 Map 中):

//dfMap is a String->int Map in memory
//Array[(String, Int)] = Array((B,2), (A,3), (C,1))
val dfMap = dfrdd.collectAsMap;
//tfrdd is a rdd, and I can use dfMap in its mapValues function
//tfrdd: Array((doc1,Map(A -> 3.0)), (doc2,Map(A -> 2.0, B -> 1.0)))
val tfidfrdd = tfrdd.mapValues( e => e.map(x => x._1 -> x._2 * lineNum / dfMap.getOrElse(x._1, 1) ) ); 
tfidfrdd.saveAsTextFile("/somedir/result/");

该代码工作得很好。我的问题是那里发生了什么?驱动程序是否像广播一样将 dfMap 发送给所有工作人员?

如果我像这样明确地编写广播代码有什么区别:

dfMap = sc.broadcast(dfrdd.collectAsMap)
val tfidfrdd = tfrdd.mapValues( e => e.map(x => x._1 -> x._2 * lineNum / dfMap.value.getOrElse(x._1, 1) ) 

我检查了更多资源并汇总了其他人的答案并将其按顺序排列。直接使用外部变量(作为我所谓的“全局变量”)和使用 sc.broadcast() 广播变量之间的区别如下:

1)当直接使用外部变量时,spark将随每个任务一起发送序列化变量的副本。而通过 sc.broadcast,变量会向每个 EXECUTOR 发送一份副本。 Task的数量通常是Executor的10倍。

因此,当变量(比如地图)足够大(超过20K)时,前一个操作可能会花费大量的网络转换时间并导致频繁的GC,从而减慢spark的速度。因此,建议显式广播大变量(>20K)。

2)直接使用外部变量时,该变量不会被持久化,它会随着任务结束而无法重复使用。而通过 sc.broadcast() ,变量会自动保留在执行程序的内存中,它会一直持续到您明确取消保留它为止。因此 sc.broadcast 变量可跨任务和阶段使用。

因此,如果预计该变量会被多次使用,建议使用 sc.broadcast() 。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

当我在 scala 中使用全局映射变量而不广播时会发生什么 的相关文章

随机推荐