我有以下代码:
val blueCount = sc.accumulator[Long](0)
val output = input.map { data =>
for (value <- data.getValues()) {
if (record.getEnum() == DataEnum.BLUE) {
blueCount += 1
println("Enum = BLUE : " + value.toString()
}
}
data
}.persist(StorageLevel.MEMORY_ONLY_SER)
output.saveAsTextFile("myOutput")
然后 blueCount 不为零,但我没有 println() 输出!我在这里错过了什么吗?谢谢!
这是一个概念性问题...
想象一下,您有一个大集群,由许多工作人员组成,比方说n
工作人员和这些工作人员存储一个分区RDD
or DataFrame
,想象你开始一个map
跨该数据及其内部的任务map
你有一个print
声明,首先:
- 这些数据将在哪里打印出来?
- 什么节点有优先级,什么分区?
- 如果所有节点并行运行,谁会先被打印?
- 这个打印队列是如何创建的?
这些问题太多了,因此设计者/维护者apache-spark
合乎逻辑地决定放弃任何支持print
任何内的语句map-reduce
操作(这包括accumulators
乃至broadcast
变量)。
这也是有道理的,因为 Spark 是一种语言designed对于非常大的数据集。虽然打印对于测试和调试很有用,但您不会希望打印 DataFrame 或 RDD 的每一行,因为它们被构建为具有数百万或数十亿行!那么,当您一开始就不想打印时,为什么要处理这些复杂的问题呢?
为了证明这一点,您可以运行以下 scala 代码:
// Let's create a simple RDD
val rdd = sc.parallelize(1 to 10000)
def printStuff(x:Int):Int = {
println(x)
x + 1
}
// It doesn't print anything! because of a logic design limitation!
rdd.map(printStuff)
// But you can print the RDD by doing the following:
rdd.take(10).foreach(println)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)