在 Akka 中等待多个结果

2024-03-11

在 Akka 中等待多个 actor 结果的正确方法是什么?

The 反应式编程原理 https://www.coursera.org/course/reactiveCoursera 课程有一个使用复制键值存储的练习。在不详细讨论分配的细节的情况下,它需要等待多个参与者的确认才能表明复制已完成。

我使用包含未完成请求的可变映射来实现分配,但我觉得该解决方案有“难闻的气味”。我希望有更好的方法来实现看似常见的场景。

为了通过保留我的练习解决方案来维护班级的荣誉准则,我有一个描述类似问题的抽象用例。

发票行项目需要计算其应纳税额。纳税义务是跨多个税务机关(例如联邦、州、警察区)应用于该行项目的所有税费的组合。如果每个税务机关都是能够确定行项目纳税义务的参与者,则行项目将需要所有参与者进行报告,然后才能继续报告总体纳税义务。在 Akka 中完成此场景的最佳/正确方法是什么?


这是我相信您正在寻找的一个简化示例。它展示了像actor这样的master如何生成一些子worker,然后等待它们的所有响应,处理等待结果可能发生超时的情况。该解决方案展示了如何等待初始请求,然后在等待响应时切换到新的接收函数。它还展示了如何将状态传播到等待的接收函数中,以避免在实例级别具有显式的可变状态。

object TaxCalculator {
  sealed trait TaxType
  case object StateTax extends TaxType
  case object FederalTax extends TaxType
  case object PoliceDistrictTax extends TaxType
  val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax)

  case class GetTaxAmount(grossEarnings:Double)
  case class TaxResult(taxType:TaxType, amount:Double)  

  case class TotalTaxResult(taxAmount:Double)
  case object TaxCalculationTimeout
}

class TaxCalculator extends Actor{
 import TaxCalculator._
 import context._
 import concurrent.duration._

  def receive =  waitingForRequest

  def waitingForRequest:Receive = {
    case gta:GetTaxAmount =>
      val children = AllTaxTypes map (tt => actorOf(propsFor(tt)))
      children foreach (_ ! gta)
      setReceiveTimeout(2 seconds)
      become(waitingForResponses(sender, AllTaxTypes))
  }

  def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = {
    case TaxResult(tt, amount) =>
      val newTaxes = taxes ++ Map(tt -> amount)
      if (newTaxes.keySet == expectedTypes){
        respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_))
        context stop self
      }
      else{
        become(waitingForResponses(respondTo, expectedTypes, newTaxes))
      }

    case ReceiveTimeout =>
      respondTo ! TaxCalculationTimeout
      context stop self
  }

  def propsFor(taxType:TaxType) = taxType match{
    case StateTax => Props[StateTaxCalculator]
    case FederalTax => Props[FederalTaxCalculator]
    case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator]
  }  
}

trait TaxCalculatingActor extends Actor{  
  import TaxCalculator._
  val taxType:TaxType
  val percentage:Double

  def receive = {
    case GetTaxAmount(earnings) => 
      val tax = earnings * percentage
      sender ! TaxResult(taxType, tax)
  }
}

class FederalTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.FederalTax
  val percentage = 0.20
}

class StateTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.StateTax
  val percentage = 0.10
}

class PoliceDistrictTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.PoliceDistrictTax
  val percentage = 0.05
}

然后您可以使用以下代码对此进行测试:

import TaxCalculator._
import akka.pattern.ask
import concurrent.duration._
implicit val timeout = Timeout(5 seconds)

val system = ActorSystem("taxes")
import system._
val cal = system.actorOf(Props[TaxCalculator])
val fut = cal ? GetTaxAmount(1000.00)
fut onComplete{
  case util.Success(TotalTaxResult(amount)) =>
    println(s"Got tax total of $amount")
  case util.Success(TaxCalculationTimeout) =>
    println("Got timeout calculating tax")
  case util.Failure(ex) => 
    println(s"Got exception calculating tax: ${ex.getMessage}")
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Akka 中等待多个结果 的相关文章

  • 非阻塞 TCP 套接字并在发送后立即刷新?

    我正在为我的应用程序使用 Windows 套接字 winsock2 h 由于阻塞套接字不允许我控制连接超时 因此我使用非阻塞套接字 发送命令后 我正在使用关闭命令来刷新 我必须这样做 我的超时是50ms 我想知道如果要发送的数据这么大 是否
  • Scala + Play Framework + Slick + Akka - 来自 Akka Actor 的数据库访问

    在收到 TCP 特定消息时 我需要查询我的数据库 为此 我创建了一个名为 DBActor 的 Actor 并将其加载到Application scala file class Application Inject system ActorS
  • Akka TestProbe 测试 context.watch() / 终止处理

    我正在使用 TestKit 测试 akka 系统 我正在测试的系统的一个参与者在收到某种消息类型后 context watches 发送者 并在发送者死亡时自杀 trait Handler extends Actor override de
  • akka java 编程覆盖配置

    我能找到的几个关于此问题的主题都是针对 Scala 而不是 Java 而且没有一个涉及远程参与者 我有一个基本配置文件 SERVER CONFIG FILE Include akka common TheSystem akka actor
  • 用于处理接收中的异步操作的 Akka 模式

    我有一个 Actor 它接收指标数据点并定期聚合并将它们保存到磁盘 后一个操作执行 I O 因此我不想使用阻塞操作 但是 如果我将其切换为异步 如何防止在聚合完成之前接收其他数据点而不阻塞某处 我见过的一种模式是使用Stash 像这样 cl
  • 考虑到可扩展性的基于组件的应用程序:OSGi 还是 Akka?

    在我的硕士论文中 我正在开发一个用于销售大型活动门票的应用程序框架 我的主要要求是可修改性 可扩展性和性能 我的客户 活动组织者 应该能够在运行时轻松替换组件并添加功能 此类组件的一个示例是座位分配组件 我的导师说看看 OSGi 松散耦合的
  • 为 testProbe 提供 ActorPath

    我有一些代码使用 ActorPath 而不是 ActorRef 向演员发送消息 system actorSelection user myActor a message 我需要确保这条消息确实被发送了 因此 我需要在测试中创建一个位于此 a
  • 持久 Akka 邮箱和无损

    在 Akka 中 当一个 actor 在处理消息时死亡 内部onReceive 该消息丢失 有没有办法保证无损 有没有办法配置 Akka 始终保留消息before将他们发送到onReceive 以便在演员死亡时可以恢复并重播 也许像持久邮箱
  • 如何使用 Akka HTTP 从多个参与者/Web 处理程序正确调用单个服务器?

    我有一个服务 我们称之为服务 A 它使用 Akka Server HTTP 来处理传入请求 我还有第 3 方应用程序 服务 B 它提供了多种 Web 服务 服务 A 的目的是转换客户端请求 调用服务 B 的一个或多个 Web 服务 合并 转
  • perl,使用 IO::Select 和 IO::Socket::INET 读取阻塞

    该服务器工作正常 但如果我这样做 bash echo n abcd sleep 50 echo efgh 数控本地主机 9090 服务器阻塞 50 秒 在我的完整代码中 我有不止一个IO Select INET 我有另一个套接字侦听其他端口
  • Akka 流如何不断实现?

    我在用阿卡流 http doc akka io docs akka stream and http experimental 1 0 scala stream index html在 Scala 中进行轮询AWS SQS https aws
  • Akka Actor 询问和类型安全

    我如何使用 Akka Actor Ask 并维护类型安全 或者避免使用 询问 而使用 告诉 打电话时 or ask在 Akka Actor 上 Future Any 返回 我必须通过进行显式转换future mapTo MyType 我不喜
  • 理解 scala 中参与者的线程性

    有人告诉我 Scala Actors 实际上从来不会同时执行两个操作 这表明 act 或 React 或 receive 方法本质上是同步的 我知道 act 方法中的长操作可能会导致阻塞问题 并且我假设对消息队列的访问必须以某种方式同步 但
  • 为 WinSocks 和 *nix 制作非阻塞套接字

    在 C C 中 如何将 WinSocks 和 nix 中的阻塞套接字转换为非阻塞套接字 这样 select 就能正常工作 您可以将预处理器用于特定于平台的代码 在Linux上 fcntl fd F SETFL O NONBLOCK Wind
  • Akka Testkit 错误且版本不明确

    我正在使用 Akka testkit 库来测试我的一些演员 这是我的 build sbt 中的内容 com typesafe akka akka testkit 2 6 10 Test 这是我得到的错误 java lang IllegalS
  • 如何解析 Spray-routing 中的 get 请求参数?

    这就是代码部分的样子 get respondWithMediaType MediaTypes application json entity as HttpRequest obj gt complete println obj ok 我可以
  • 玩 Scala Akka WebSockets 更改 actor 路径

    我遵循使用 Scala Play 和 Akka Actor 创建 Web 套接字的示例 https www playframework com documentation 2 5 x ScalaWebSockets Handling Web
  • 知道 akka actor 何时完成

    有几个人和我一起从事一个项目 一直在试图找出解决这个问题的最佳方法 看起来这应该是经常需要的标准东西 但由于某种原因我们似乎无法得到正确的答案 如果我有一些工作要做 并且我向路由器抛出一堆消息 我如何知道所有工作何时完成 例如 如果我们正在
  • 具有内部赋值延迟的阻塞和非阻塞语句之间的区别

    以下 2 个 verilog 代码片段有什么区别 1 always in out 5 in AND 2 always in out lt 5 in 考虑到always块中不存在其他行 输出会有什么不同吗 问题参考幻灯片 16 参见 o5 和
  • 演员邮箱溢出。斯卡拉

    我目前正在与 scala 的两位演员合作 一 producer 产生一些数据并将其发送到parcer 生产者发送一个HashMap String HashMap Object List Int 通过消息 以及this标记发件人 parcer

随机推荐

  • pycharm看不到python3.7解释器

    我在 Linux Mint 19 Tara Xfce 上使用 Pycharm Community 2018 1 4 它与 Python 3 5 解释器配合良好 我安装了 Python 3 7 作为系统上默认的 Python 解释器 pyth
  • 修改LD_LIBRARY_PATH

    在 UNIX 中 我正在尝试修改LD LIBRARY PATH为了使其包含一些库 我无法添加到系统的常规库中 我没有根权限 它是大学服务器 此时我已经使用了很多我在网络上找到的方法执行此操作但无法修改它 我也看不到是什么PATH 我尝试过使
  • 反转 ResourceManager

    如果执行 ResourceManager GetString Key 则可以获得资源中项目的值 有没有办法进行反向查找以从给定值的资源中获取密钥 本质上是反翻译 您应该能够获取 ResourceSet 并迭代它的值 如果它们相等则返回键 请
  • CUDA 标量和 SIMD 视频指令的效率

    SIMD指令的吞吐量低于32位整数运算 如果是 SM2 0 仅标量指令版本 则低 2 倍 如果是 SM3 0 则低 6 倍 什么情况下适合使用它们 如果您的数据已经以 SIMD 视频指令本机处理的格式打包 则需要多个步骤对其进行解包 以便可
  • 如何让树视图重新考虑是否需要水平滚动条?

    考虑以下非常简单的单元 Unit1 pas unit Unit1 interface uses Windows Classes Controls Forms ComCtrls type TForm1 class TForm TreeView
  • 使用内部/外部传播包

    我想在我的模拟中放置一个 系统 组件 类似于Modelica Fluid System and Modelica Mechanics MultiBody World 所有其他组件都可以从中访问Medium包 以便在整个流程图中仅设置一次工作
  • matplotlib 交互式绘图(在图表上手动绘制线条)

    我已经使用 matplotlib 成功绘制了一组日期排序数据 X 轴是日期 但是 我希望能够manually在绘制的图表上从一个 date1 y1 到另一个 date2 y2 绘制线条 我似乎找不到任何例子来说明如何做到这一点 或者实际上是
  • 如何为 Ionic 4(android / ios)应用程序创建 google pay?

    我使用以下方法创建了 google pay for web URL Google 网络付费 https developers google com pay api web guides tutorial 我想为 ionic 4 应用程序创建
  • 转换为 UCS2

    Vb net 或C 中是否有任何函数可以对UCS2中的字符串进行编码 Thanks 使用以下函数以 UCS2 格式对 unicode 字符串进行编码 gt Used to encoding GSM message as UCS2 publi
  • Data.table:连接 ID 和日期键,但希望第一个表中的日期键之前(或等于)最接近的日期

    我真的很感谢对这个问题的一些帮助 我找不到足够接近的例子 我有两个 data tables 第一个称为customer table 包含特定于时间戳 AsOfDate 第二个表称为activity table描述发送给该客户的营销活动Act
  • MonoDevelop - 转换行结束对话框

    我经常在 Linux 上的 MonoDevelop 和具有相同代码库的 VS2010 之间切换 因此我永远不得不单击不转换行结尾 MonoDevelop 中是否有一个选项可以设置默认答案以防止弹出此对话框 找到了 编辑 gt 首选项 gt
  • 无法更改 root 帐户的密码 ansible

    我有以下错误 failed true invocation module args append privs false check implicit admin true config file root my cnf connect t
  • Unicorn Rails - 在生产模式下启动时占用 100% CPU

    我们使用 Unicorn Rails nginx 它在我的系统中的开发模式和生产模式下运行良好 4GB RAM Intel R Core TM 2 Duo CPU P8600 2 40GHz 我可以在本地系统中启动 10 个工作人员 但在生
  • 如何使用Android传感器计算旋转角度?

    我正在做一个 Opengl 应用程序 如果 Android 设备沿 Z 轴旋转 倾斜 我必须旋转相机 我尝试过SensorManager getOrientation R orientVals 使用磁性和加速度传感器 但数值波动很大 我的设
  • UpgradeableApp API 不断给出“无效的 OAuth 消费者密钥”

    给出以下 ruby 代码 consumer OAuth Consumer new consumer key consumer secret site https www googleapis com resp consumer reques
  • 在python中获取方阵的上三角形或下三角形的所有元素

    numpy scipy 中是否有一个函数可以返回方阵的一个三角形 上或下 的所有元素 e g matrix 1 2 3 4 5 6 7 8 9 三角形 上 下 up 1 2 3 5 6 9 down 1 4 5 7 8 9 or up 1
  • 如何使画布文本可选择?

    任何建议都将受到高度赞赏 文本选择有很多组成部分 有些是视觉的 有些是非视觉的 首先 使文本可选择 您必须保留一个数组 其中包含文本的位置 文本是什么以及使用的字体 您将通过 Canvas 函数measureText 使用此信息 通过使用m
  • Elasticsearch:从索引中删除重复项

    我有一个包含多个重复条目的索引 它们具有不同的 id 但其他字段具有相同的内容 例如 id 1 content content1 id 2 content content1 id 3 content content2 id 4 conten
  • 使用 Func 时如何调用 invoke

    在函数中Test Func
  • 在 Akka 中等待多个结果

    在 Akka 中等待多个 actor 结果的正确方法是什么 The 反应式编程原理 https www coursera org course reactiveCoursera 课程有一个使用复制键值存储的练习 在不详细讨论分配的细节的情况