为了测试 Spark 中的序列化异常,我用两种方式编写了一个任务。
第一种方式:
package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object dd {
def main(args: Array[String]):Unit = {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val data = List(1,2,3,4,5)
val rdd = sc.makeRDD(data)
val result = rdd.map(elem => {
funcs.func_1(elem)
})
println(result.count())
}
}
object funcs{
def func_1(i:Int): Int = {
i + 1
}
}
这样 Spark 的效果就非常好。
当我将其更改为以下方式时,它不起作用并抛出 NotSerializedException。
第二种方式:
package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object dd {
def main(args: Array[String]):Unit = {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val data = List(1,2,3,4,5)
val rdd = sc.makeRDD(data)
val handler = funcs
val result = rdd.map(elem => {
handler.func_1(elem)
})
println(result.count())
}
}
object funcs{
def func_1(i:Int): Int = {
i + 1
}
}
我知道我收到错误“任务不可序列化”的原因是因为我试图发送不可序列化的对象funcs
在第二个示例中从驱动程序节点到工作程序节点。对于第二个例子,如果我创建对象funcs
extend Serializable
,这个错误就会消失。
但在我看来,因为funcs
是一个对象而不是类,它是一个单例,应该被序列化并从驱动程序发送到工作人员,而不是在工作人员节点本身内实例化。在这种情况下,虽然使用对象的方式funcs
是不同的,我猜是不可序列化的对象funcs
在这两个示例中,都是从驱动程序节点传送到工作程序节点。
我的问题是为什么第一个示例可以成功运行,但第二个示例失败并出现“任务不可序列化”异常。