在 scala 中,当我在 scala 中使用全局映射变量而不进行广播时会发生什么?
例如。如果我使用变量collect*
(例如collectAsMap
),看来它是一个全局变量,我可以在所有地方使用它RDD.mapValues()
函数无需显式广播它。
但我知道 Spark 是分布式工作的,它不应该能够在不广播全局内存存储变量的情况下处理它。所以发生了什么事?
代码示例(此代码在文本中调用 tf-idf,其中 df 存储在 Map 中):
//dfMap is a String->int Map in memory
//Array[(String, Int)] = Array((B,2), (A,3), (C,1))
val dfMap = dfrdd.collectAsMap;
//tfrdd is a rdd, and I can use dfMap in its mapValues function
//tfrdd: Array((doc1,Map(A -> 3.0)), (doc2,Map(A -> 2.0, B -> 1.0)))
val tfidfrdd = tfrdd.mapValues( e => e.map(x => x._1 -> x._2 * lineNum / dfMap.getOrElse(x._1, 1) ) );
tfidfrdd.saveAsTextFile("/somedir/result/");
该代码工作得很好。我的问题是那里发生了什么?驱动程序是否像广播一样将 dfMap 发送给所有工作人员?
如果我像这样明确地编写广播代码有什么区别:
dfMap = sc.broadcast(dfrdd.collectAsMap)
val tfidfrdd = tfrdd.mapValues( e => e.map(x => x._1 -> x._2 * lineNum / dfMap.value.getOrElse(x._1, 1) )