在 Akka 中等待多个 actor 结果的正确方法是什么?
The 反应式编程原理 https://www.coursera.org/course/reactiveCoursera 课程有一个使用复制键值存储的练习。在不详细讨论分配的细节的情况下,它需要等待多个参与者的确认才能表明复制已完成。
我使用包含未完成请求的可变映射来实现分配,但我觉得该解决方案有“难闻的气味”。我希望有更好的方法来实现看似常见的场景。
为了通过保留我的练习解决方案来维护班级的荣誉准则,我有一个描述类似问题的抽象用例。
发票行项目需要计算其应纳税额。纳税义务是跨多个税务机关(例如联邦、州、警察区)应用于该行项目的所有税费的组合。如果每个税务机关都是能够确定行项目纳税义务的参与者,则行项目将需要所有参与者进行报告,然后才能继续报告总体纳税义务。在 Akka 中完成此场景的最佳/正确方法是什么?
这是我相信您正在寻找的一个简化示例。它展示了像actor这样的master如何生成一些子worker,然后等待它们的所有响应,处理等待结果可能发生超时的情况。该解决方案展示了如何等待初始请求,然后在等待响应时切换到新的接收函数。它还展示了如何将状态传播到等待的接收函数中,以避免在实例级别具有显式的可变状态。
object TaxCalculator {
sealed trait TaxType
case object StateTax extends TaxType
case object FederalTax extends TaxType
case object PoliceDistrictTax extends TaxType
val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax)
case class GetTaxAmount(grossEarnings:Double)
case class TaxResult(taxType:TaxType, amount:Double)
case class TotalTaxResult(taxAmount:Double)
case object TaxCalculationTimeout
}
class TaxCalculator extends Actor{
import TaxCalculator._
import context._
import concurrent.duration._
def receive = waitingForRequest
def waitingForRequest:Receive = {
case gta:GetTaxAmount =>
val children = AllTaxTypes map (tt => actorOf(propsFor(tt)))
children foreach (_ ! gta)
setReceiveTimeout(2 seconds)
become(waitingForResponses(sender, AllTaxTypes))
}
def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = {
case TaxResult(tt, amount) =>
val newTaxes = taxes ++ Map(tt -> amount)
if (newTaxes.keySet == expectedTypes){
respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_))
context stop self
}
else{
become(waitingForResponses(respondTo, expectedTypes, newTaxes))
}
case ReceiveTimeout =>
respondTo ! TaxCalculationTimeout
context stop self
}
def propsFor(taxType:TaxType) = taxType match{
case StateTax => Props[StateTaxCalculator]
case FederalTax => Props[FederalTaxCalculator]
case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator]
}
}
trait TaxCalculatingActor extends Actor{
import TaxCalculator._
val taxType:TaxType
val percentage:Double
def receive = {
case GetTaxAmount(earnings) =>
val tax = earnings * percentage
sender ! TaxResult(taxType, tax)
}
}
class FederalTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.FederalTax
val percentage = 0.20
}
class StateTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.StateTax
val percentage = 0.10
}
class PoliceDistrictTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.PoliceDistrictTax
val percentage = 0.05
}
然后您可以使用以下代码对此进行测试:
import TaxCalculator._
import akka.pattern.ask
import concurrent.duration._
implicit val timeout = Timeout(5 seconds)
val system = ActorSystem("taxes")
import system._
val cal = system.actorOf(Props[TaxCalculator])
val fut = cal ? GetTaxAmount(1000.00)
fut onComplete{
case util.Success(TotalTaxResult(amount)) =>
println(s"Got tax total of $amount")
case util.Success(TaxCalculationTimeout) =>
println("Got timeout calculating tax")
case util.Failure(ex) =>
println(s"Got exception calculating tax: ${ex.getMessage}")
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)