我开始使用 Spark Streaming 来处理我收到的实时数据源。我的场景是,我有一个使用“with ActorHelper”的 Akka actor 接收器,然后我让 Spark 作业执行一些映射和转换,然后我想将结果发送给另一个 actor。
我的问题是最后一部分。当尝试发送给另一个参与者时,Spark 引发异常:
15/02/20 16:43:16 警告 TaskSetManager:在阶段 2.0 中丢失任务 0.0(TID 2,本地主机):java.lang.IllegalStateException:尝试反序列化范围内没有 ActorSystem 的序列化 ActorRef。使用 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'
我创建最后一个演员的方式如下:
val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")
然后像这样使用它:
result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))
我不确定在哪里或如何执行建议“Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'”。我需要通过配置设置什么特别的吗?或者以不同的方式创造我的演员?
查看以下示例以访问 Spark 域之外的参与者。
/*
* 以下是使用actorStream插入自定义actor作为接收者
*
* 需要注意的重要一点:
* 由于Actor可能存在于spark框架之外,因此这是用户的责任
* 确保类型安全,即接收到的数据类型和InputDstream
* 应该是一样的。
*
* 例如:actorStream和SampleActorReceiver都是参数化的
* 为相同类型以确保类型安全。
*/
val lines = ssc.actorStream[String](
Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)