为什么在 Akka Dispatcher 上启动时,Futures in Futures 会按顺序运行

2023-11-27

当我们尝试从参与者的接收方法中启动多个 future 时,我们观察到了奇怪的行为。 如果我们使用配置的调度程序作为 ExecutionContext,则 futures 将在同一线程上按顺序运行。如果我们使用 ExecutionContext.Implicits.global,则 future 将按预期并行运行。

我们将代码简化为以下示例(更完整的示例如下):

implicit val ec = context.getDispatcher

Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }

Future {
   Future{ doWork() } 
   Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
   Future{ doWork() }
   Future{ doWork() }
}

一个可编译的例子是这样的:

import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")   

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(startFuture)
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }


}

我们尝试了 thread-pool-executor 和 fork-join-executor,得到了相同的结果。

我们是否以错误的方式使用期货? 那么您应该如何生成并行任务呢?


从Akka内部的描述来看BatchingExecutor(强调我的):

执行器的 Mixin 特征组多个嵌套Runnable.run()调用传递给原始 Executor 的单个 Runnable。这可能是一个有用的优化,因为它绕过原始上下文的任务队列并将相关(嵌套)代码保留在单个线程上,这可能会提高 CPU 亲和力。但是,如果传递给执行器的任务是阻塞的或昂贵的,则这种优化可以防止工作窃取并使性能变差......如果代码不使用批处理执行器,则可能会造成死锁scala.concurrent.blocking当它应该时,因为在其他任务中创建的任务将阻止外部任务完成。

如果您使用的是混合的调度程序BatchingExecutor--即,一个子类MessageDispatcher--你可以使用scala.concurrent.blocking构造以实现与嵌套 Future 的并行性:

Future {
  Future {
    blocking {
      doBlockingWork()
    }
  }
}

在您的示例中,您将添加blocking in the startFuture method:

private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
  blocking {
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }
}

运行的示例输出startFutures(true)(actorSystem.dispatcher)经过上述更改:

Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么在 Akka Dispatcher 上启动时,Futures in Futures 会按顺序运行 的相关文章

  • Scala:需要类类型,但找到了 T

    我发现了与此特定问题类似的问题 但是该问题是由于有人试图直接实例化 T 造成的 在这里 我试图创建一个特征 它是一个通用接口来扩展类并将它们自动存储在数据库中 例如 Riak 使用classOf T 使用 Scala 2 10 这是我的代码
  • 并行 Haskell - GHC GC 火花

    我有一个正在尝试并行化的程序 带有可运行代码的完整粘贴here http lpaste net 101528 我进行了分析 发现大部分时间都花在findNearest这本质上是一个简单的foldr超过一个大Data Map findNear
  • 在 Spark 中将流式 XML 转换为 JSON

    我是 Spark 新手 正在开发一个简单的应用程序 将从 Kafka 接收的 XML 流转换为 JSON 格式 Using 火花2 4 5 斯卡拉 2 11 12 在我的用例中 kafka 流采用 xml 格式 以下是我尝试过的代码 val
  • 如何并行安装/编译 pip 要求(使 -j 等效)

    我的 pip 要求中有很多软件包需要安装 我想并行处理它们 我知道 例如 如果我想要n并行作业来自make我必须写make j n 是否有满足 pip 要求的等效命令 Thanks 有时 pip 使用 make 来构建依赖项 如果在开始之前
  • Spark Streaming 中是否需要检查点

    我注意到 Spark 流示例也有检查点代码 我的问题是检查点有多重要 如果是为了容错 那么在此类流应用程序中发生故障的频率是多少 这一切都取决于您的用例 假设您正在运行一个流作业 它仅从 Kafka 读取数据并计算记录数 如果您的应用程序在
  • 子进程调用,它们是并行完成的吗?

    我一直在谷歌搜索这个问题的答案 但似乎没有一个答案 谁能告诉我如果subprocess模块是否并行调用 Python 文档建议它可用于生成新进程 但没有提及它们是否并行 如果它们可以并行完成 您能否给我举一个例子或将我链接到一个例子 这取决
  • AsyncTask的并行执行

    An 异步任务单击时执行 List
  • 异步任务中止时会发生什么?

    锈有async可以绑定到的方法Abortable https docs rs futures preview 0 3 0 alpha 19 futures future struct Abortable html期货 文档说 当中止时 未来
  • 获取 Future 对象的进度的能力

    参考 java util concurrent 包和 Future 接口 我注意到 除非我弄错了 只有 SwingWorker 实现类才能启动冗长的任务并能够查询进度 这就引出了以下问题 有没有办法在非 GUI 非 Swing 应用程序 映
  • Scala 中的行聚合

    我正在寻找一种方法在 Scala 的数据框中获取一个新列来计算min max中的值col1 col2 col10对于每一行 我知道我可以使用 UDF 来做到这一点 但也许有一种更简单的方法 Thanks Porting 这个Python答案
  • Scala 将递归有界类型参数(F 界)转换为类型成员

    我将如何转换 trait Foo A lt Foo A 给类型成员 也就是说 我想要以下内容 trait Foo type A lt Foo type A 但我遇到了困难 因为名称 A 已在类型细化中使用 这个问题是类似的 并衍生自 通过类
  • 错误:无法在 scala 中找到或加载主类

    安装 eclipse scala 插件和 eclipse maven scala 插件后 我是 scala 新手 所以我尝试确保在测试 scala hello world 项目后环境正常工作 它按预期工作 但我在尝试执行我从公司存储库中签出
  • 如何在 Spark 数据帧 groupBy 中执行 count(*)

    我的目的是做相当于基本sql的事情 select shipgrp shipstatus count cnt from shipstatus group by shipgrp shipstatus 我见过的 Spark 数据帧的示例包括其他列
  • 将 DOCTYPE 添加到 Scala XML 的最简单方法?

    我怎样才能在 Scala XML 中制作这个最小的 HTML5 p p 当然 在 Scala 中制作类似 HTML 的 XML 很简单 gt val html p p html scala xml Elem p p 但是 我怎样才能注入DO
  • SBT插件——编译前执行自定义任务

    我刚刚编写了我的第一个 SBT 自动插件 它有一个生成设置文件的自定义任务 如果该文件尚不存在 当显式调用任务时 一切都会按预期工作 但我希望在使用插件编译项目之前自动调用它 无需项目修改其 build sbt 文件 有没有办法实现这一点
  • Scala apply 方法调用,因为括号与隐式参数冲突

    Cay Horstmann 的书 Scala for the Impressive 中有一段关于 apply 方法的注释 有时 表示法会与另一个 Scala 功能发生冲突 隐式参数 例如 表达式 Bonjour sorted 3 产生错误
  • Scala 隐式转换范围问题

    采取这个代码 class Register var value Int 0 def getZeroFlag Boolean value 0x80 0 object Register implicit def reg2int r Regist
  • 在 Scala 中反转地图的优雅方法

    目前正在学习Scala 需要反转Map 来进行一些反转值 gt 键查找 我一直在寻找一种简单的方法来做到这一点 但只想到了 Map origMap map kvp gt kvp 2 gt kvp 1 有人有更优雅的方法吗 假设值是唯一的 则
  • 如何在scala中生成n-gram?

    我正在尝试在 scala 中编写基于 n gram 的分离新闻算法 如何为大文件生成 n gram 例如 对于包含 蜜蜂是蜜蜂中的蜜蜂 的文件 首先它必须选择一个随机的 n 元语法 例如 蜜蜂 然后它必须寻找以 n 1 个单词开头的 n 元
  • 在 Scala 中创建任意类作为 monad 实例

    为了使任何东西都可以在 monad 上下文中操作 如果使用 Haskell 我只需在任何地方为给定类型添加类 Monad 的实现 所以我根本不接触数据类型定义的来源 像 人造的东西 data Z a MyZLeft a MyZRight a

随机推荐