Akka Stream Graph 恢复问题

2024-05-13

我创建了一个图表来并行化具有相同输入的两个流。这些流产生 Future[Option[Entity]]。如果 flowA 失败,我想返回 Future[None] 但恢复似乎没有被调用

    val graph: Flow[Input, (Future[Option[Entity]], Future[Option[Entity]]), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val broadcast = builder.add(Broadcast[Input](2))
    val zip = builder.add(Zip[Future[Option[Entity]], Future[Option[Entity]]])

    val flowAwithRecovery = flowA.recover{ case t: Throwable =>
      logger.error(t, "Error retrieving output from flowA. Resuming without them.")
      Future.successful(None)
    }

    broadcast.out(0) ~> flowAwithRecovery ~> zip.in0
    broadcast.out(1) ~> flowB ~> zip.in1

    FlowShape(broadcast.in, zip.out)
  })

当我运行图表并且 flowA 返回失败的 Future 时,恢复不会被执行。作为一种解决方法,我在处理结束时恢复 Future,但我想在设计图表时放置这种逻辑。


The recover当异常从上游传播时,组合器就会发挥作用。 AFuture.failed不是一个例外,而是一个有效的元素。 你需要类似的东西

flowA.map(_.recover{ case t: Throwable =>
      logger.error(t, "Error retrieving output from flowA. Resuming without them.")
      None
    })

另一方面,你真的需要传递吗?Future在你的流程中吗?你可能最好使用mapAsync建造时flowA and flowB让他们只生产Option[Entity].

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Akka Stream Graph 恢复问题 的相关文章

随机推荐