当我们尝试从参与者的接收方法中启动多个 future 时,我们观察到了奇怪的行为。
如果我们使用配置的调度程序作为 ExecutionContext,则 futures 将在同一线程上按顺序运行。如果我们使用 ExecutionContext.Implicits.global,则 future 将按预期并行运行。
我们将代码简化为以下示例(更完整的示例如下):
implicit val ec = context.getDispatcher
Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }
Future {
Future{ doWork() }
Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
Future{ doWork() }
Future{ doWork() }
}
一个可编译的例子是这样的:
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
object WhyNotParallelExperiment extends App {
val actorSystem = ActorSystem(s"Experimental")
// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)
// Futures started in future: running in sequentially. Why????
startFutures(runInFuture = true)(actorSystem.dispatcher)
Thread.sleep(5000)
actorSystem.terminate()
private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
if (runInFuture) {
Future{
println(s"Start Futures on thread ${Thread.currentThread().getName()}")
(1 to 9).foreach(startFuture)
println(s"Started Futures on thread ${Thread.currentThread().getName()}")
}
} else {
(11 to 19).foreach(startFuture)
}
}
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
我们尝试了 thread-pool-executor 和 fork-join-executor,得到了相同的结果。
我们是否以错误的方式使用期货?
那么您应该如何生成并行任务呢?
从Akka内部的描述来看BatchingExecutor(强调我的):
执行器的 Mixin 特征组多个嵌套Runnable.run()
调用传递给原始 Executor 的单个 Runnable。这可能是一个有用的优化,因为它绕过原始上下文的任务队列并将相关(嵌套)代码保留在单个线程上,这可能会提高 CPU 亲和力。但是,如果传递给执行器的任务是阻塞的或昂贵的,则这种优化可以防止工作窃取并使性能变差......如果代码不使用批处理执行器,则可能会造成死锁scala.concurrent.blocking
当它应该时,因为在其他任务中创建的任务将阻止外部任务完成。
如果您使用的是混合的调度程序BatchingExecutor
--即,一个子类MessageDispatcher--你可以使用scala.concurrent.blocking
构造以实现与嵌套 Future 的并行性:
Future {
Future {
blocking {
doBlockingWork()
}
}
}
在您的示例中,您将添加blocking
in the startFuture
method:
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
blocking {
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
运行的示例输出startFutures(true)(actorSystem.dispatcher)
经过上述更改:
Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)