您可以使用 for 理解,如下所示:
val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}
val aggFut = for{
f1Result <- fut1
f2Result <- fut2
f3Result <- fut3
} yield (f1Result, f2Result, f3Result)
在此示例中,期货 1、2 和 3 并行启动。然后,在 for 理解中,我们等待结果 1、然后 2、然后 3 可用。如果 1 或 2 失败,我们将不再等待 3。如果 3 项都成功,那么aggFut
val 将保存一个具有 3 个槽的元组,对应于 3 个 future 的结果。
现在,如果您需要在 fut2 首先失败时停止等待的行为,事情会变得有点棘手。在上面的示例中,您必须等待 fut1 完成才能意识到 fut2 失败。为了解决这个问题,你可以尝试这样的事情:
val fut1 = Future{Thread.sleep(3000);1}
val fut2 = Promise.failed(new RuntimeException("boo")).future
val fut3 = Future{Thread.sleep(1000);3}
def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
val fut = if (futures.size == 1) futures.head._2
else Future.firstCompletedOf(futures.values)
fut onComplete{
case Success(value) if (futures.size == 1)=>
prom.success(value :: values)
case Success(value) =>
processFutures(futures - value, value :: values, prom)
case Failure(ex) => prom.failure(ex)
}
prom.future
}
val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
aggFut onComplete{
case value => println(value)
}
现在这可以正常工作,但问题在于知道哪个Future
从中删除Map
当其中一项已成功完成时。只要您有某种方法可以将结果与产生该结果的未来正确关联起来,那么这样的事情就可以了。它只是递归地不断从 Map 中删除已完成的 Futures,然后调用Future.firstCompletedOf
关于剩余的Futures
直到没有人留下为止,沿途收集结果。这并不漂亮,但如果你真的需要你正在谈论的行为,那么这个或类似的东西可以工作。