无法反序列化 ActorRef 以将结果发送到不同的 Actor

2024-03-23

我开始使用 Spark Streaming 来处理我收到的实时数据源。我的场景是,我有一个使用“with ActorHelper”的 Akka actor 接收器,然后我让 Spark 作业执行一些映射和转换,然后我想将结果发送给另一个 actor。

我的问题是最后一部分。当尝试发送给另一个参与者时,Spark 引发异常:

15/02/20 16:43:16 警告 TaskSetManager:在阶段 2.0 中丢失任务 0.0(TID 2,本地主机):java.lang.IllegalStateException:尝试反序列化范围内没有 ActorSystem 的序列化 ActorRef。使用 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'

我创建最后一个演员的方式如下:

val actorSystem = SparkEnv.get.actorSystem
val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor")

然后像这样使用它:

result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _))

我不确定在哪里或如何执行建议“Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'”。我需要通过配置设置什么特别的吗?或者以不同的方式创造我的演员?


查看以下示例以访问 Spark 域之外的参与者。

/* * 以下是使用actorStream插入自定义actor作为接收者 * * 需要注意的重要一点: * 由于Actor可能存在于spark框架之外,因此这是用户的责任 * 确保类型安全,即接收到的数据类型和InputDstream * 应该是一样的。 * * 例如:actorStream和SampleActorReceiver都是参数化的 * 为相同类型以确保类型安全。 */

val lines = ssc.actorStream[String](
  Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
    host, port.toInt))), "SampleReceiver")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法反序列化 ActorRef 以将结果发送到不同的 Actor 的相关文章

随机推荐