Akka 整合并发数据库请求

2023-11-29

我希望能够向多个数据存储库发出并发请求并合并结果。我试图了解我的方法是否有效,或者是否有更好的方法来解决这个问题。我绝对是 Akka / Spray / Scala 的新手,并且真的想更好地了解如何正确构建这些组件。任何建议/提示将不胜感激。试图让我的头脑围绕在这种类型的实现中使用参与者和未来。

喷涂服务:

trait DemoService extends HttpService with Actor with ActorLogging {
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val mongoMasterActor = context.actorOf(Props[MongoMasterActor], "redisactor")
  val dbMaster = context.actorOf(Props[DbMasterActor], "dbactor")

  val messageApiRouting =
        path("summary" / Segment / Segment) { (dataset, timeslice) =>
          onComplete(getDbResponses(dbMaster, dataset, timeslice)) {
            case Success(dbMessageResponse) => complete(s"The result was $dbMessageResponse")
            case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
          }
        }

  /** Passes the desired actor reference for a specific dataset and timeslice for summary data retrieval
    *
    * @param mongoActor an actor reference to the RedisActor that will handle the appropriate request routing
    * @param dataset The dataset for which the summary has been requested
    * @param timeslice The timeslice (Month, Week, Day, etc.) for which the summary has been requested
    */
  def getSummary(mongoActor: ActorRef, dataset: String, timeslice: String): Future[DbMessageResponse] = {
    log.debug(s"dataset: $dataset  timeslice: $timeslice")
    val dbMessage = DbMessage("summary", dataset + timeslice)
    (mongoActor ? dbMessage).mapTo[DbMessageResponse]
  }

  def getDbResponses(dbActor: ActorRef, dataset: String, timeslice: String): Future[SummaryResponse] = {
    log.debug(s"dataset: $dataset  timeslice: $timeslice")
    val dbMessage = DbMessage("summary", dataset + timeslice)
    (dbActor ? dbMessage).mapTo[SummaryResponse]
  }

  def getSummaryPayload(mongoSummary: DbMessageResponse, redisSummary: DbMessageResponse): String = {
    mongoSummary.response + redisSummary.response
  }

}

Akka Actor / 未来的模拟数据库请求:

class DbMasterActor extends Actor with ActorLogging {


  private var originalSender: ActorRef = _

  //TODO: Need to add routing to the config to limit instances
  val summaryActor = context.actorOf(Props(new SummaryActor), "summaryactor")

  def receive = {

    case msg: DbMessage => {
      this.originalSender = sender
      msg.query match {
        case "summary" => {

          getDbResults().onComplete{
            case Success(result) => originalSender ! result
            case Failure(ex) => log.error(ex.getMessage)

          }
        }
      }
    }

    //If not match log an error
    case _ => log.error("Received unknown message: {} ")

  }


  def getDbResults(): Future[SummaryResponse] = {
    log.debug("hitting db results")
    val mongoResult = Future{ Thread.sleep(500); "Mongo"}
    val redisResult = Future{ Thread.sleep(800); "redis"}

    for{
      mResult <- mongoResult
      rResult <- redisResult

    } yield SummaryResponse(mResult, rResult)

  }
}

在阅读 Jamie Allen 的《Effective Akka》之后,我将尝试应用他的“Cameo”模式建议。

幻灯片分享:http://www.slideshare.net/shinolajla/ effective-akka-scalaio

Github: https://github.com/jamie-allen/ effective_akka

我认为我创建的方法会起作用,但根据杰米在演讲中的评论,听起来并不是最好的方法。我将更新/编辑回这篇文章我已经实现(或尝试)的内容。

概要演员(客串演员):

object SummaryResponseHandler {
  case object DbRetrievalTimeout

  def props(mongoDb: ActorRef, redisDb: ActorRef, originalSender: ActorRef): Props = {
    Props(new SummaryResponseHandler(mongoDb, redisDb, originalSender))
  }
}

class SummaryResponseHandler(mongoDb: ActorRef, redisDb: ActorRef,
                             originalSender: ActorRef) extends Actor with ActorLogging {

  import SummaryResponseHandler._
  var mongoSummary, redisSummary: Option[String] = None
  def receive = LoggingReceive {
    case MongoSummary(summary) =>
      log.debug(s"Received mongo summary: $summary")
      mongoSummary = summary
      collectSummaries
    case RedisSummary(summary) =>
      log.debug(s"Received redis summary: $summary")
      redisSummary = summary
      collectSummaries
    case DbRetrievalTimeout =>
      log.debug("Timeout occurred")
      sendResponseAndShutdown(DbRetrievalTimeout)
  }

  def collectSummaries = (mongoSummary, redisSummary) match {
    case (Some(m), Some(r)) =>
      log.debug(s"Values received for both databases")
      timeoutMessager.cancel
      sendResponseAndShutdown(DataSetSummary(mongoSummary, redisSummary))
    case _ =>
  }

  def sendResponseAndShutdown(response: Any) = {
    originalSender ! response
    log.debug("Stopping context capturing actor")
    context.stop(self)
  }

  import context.dispatcher
  val timeoutMessager = context.system.scheduler.scheduleOnce(
    250 milliseconds, self, DbRetrievalTimeout)
}

class SummaryRetriever(mongoDb: ActorRef, redisDb: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case GetSummary(dataSet) =>
      log.debug("received dataSet")
      val originalSender = sender
      val handler = context.actorOf(SummaryResponseHandler.props(mongoDb,redisDb, originalSender), "cameo-message-handler")
      mongoDb.tell(GetSummary(dataSet), handler)
      redisDb.tell(GetSummary(dataSet), handler)
    case _ => log.debug(s"Unknown result $GetSummary(datset)")
  }

}

Common:

case class GetSummary(dataSet: String)
case class DataSetSummary(
   mongo: Option[String],
   redis: Option[String]
)

case class MongoSummary(
    summary: Option[String]
                         )

case class RedisSummary(
   summary: Option[String]
                         )

trait MongoProxy extends Actor with ActorLogging
trait RedisProxy extends Actor with ActorLogging

模拟存根:

class MongoProxyStub extends RedisProxy {
  val summaryData = Map[String, String](
    "dataset1" -> "MongoData1",
    "dataset2" -> "MongoData2")

  def receive = LoggingReceive {
    case GetSummary(dataSet: String) =>
      log.debug(s"Received GetSummary for ID: $dataSet")
      summaryData.get(dataSet) match {
        case Some(data) => sender ! MongoSummary(Some(data))
        case None => sender ! MongoSummary(Some(""))
      }
  }
}

class RedisProxyStub extends MongoProxy{
  val summaryData = Map[String, String](
    "dataset1" -> "RedisData1",
    "dataset2" -> "RedisData2")

  def receive = LoggingReceive {
    case GetSummary(dataSet: String) =>
      log.debug(s"Received GetSummary for ID: $dataSet")
      summaryData.get(dataSet) match {
        case Some(data) => sender ! RedisSummary(Some(data))
        case None => sender ! RedisSummary(Some(""))
      }
  }
}

启动(您应该使用测试,但只想从启动运行):

object Boot extends App{

  val system = ActorSystem("DbSystem")

  val redisProxy = system.actorOf(Props[RedisProxyStub], "cameo-success-mongo")
  val mongoProxy = system.actorOf(Props[MongoProxyStub], "cameo-success-redis")
  val summaryRetrieverActor = system.actorOf(Props(new SummaryRetriever(redisProxy, mongoProxy)), "cameo-retriever1")

  implicit val timeout = Timeout(5 seconds)
  val future = summaryRetrieverActor ? GetSummary("dataset1")
  val result = Await.result(future, timeout.duration).asInstanceOf[DataSetSummary]
  println(Some(result.mongo).x)
  println(result.redis)

  system.shutdown()

}

应用程序配置:

akka.loglevel = "DEBUG"
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
akka.actor.debug.autoreceive = on
akka.actor.debug.lifecycle = on
akka.actor.debug.receive = on
akka.actor.debug.event-stream = on
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Akka 整合并发数据库请求 的相关文章

  • 避免函数内装箱/拆箱

    对于数字密集型代码 我编写了一个具有以下签名的函数 def update f Int Int Double gt Double Unit 然而 因为Function3不是专门的 每个应用程序f结果对 3 个参数和结果类型进行装箱 拆箱 我可
  • Scala:为什么 Actor 是轻量级的?

    是什么让演员如此轻盈 我什至不确定它们是如何工作的 它们不是单独的线程吗 当他们说轻量级时 他们的意思是每个参与者都没有映射到单个线程 JVM 提供共享内存线程 锁作为主要形式 并发抽象 但分享了 内存线程是相当重量级的 并招致严重的绩效处
  • 如何插入UUID的值?

    我在 Play Framework 2 3 支持的 postgresql 9 4 中使用 anorm 2 4 给出一个这样的模型 case class EmailQueue id UUID send from String send to
  • collect_list() 是否保持行的相对顺序?

    想象一下我有以下 DataFrame df id featureName featureValue id1 a 3 id1 b 4 id2 a 2 id2 c 5 id3 d 9 想象一下我运行 df groupBy id agg coll
  • 何时使用本地演员和远程演员?

    我什么时候应该在 Akka 中使用 Actors 和 Remote Actors 据我所知 两者都可以扩展机器 但只有远程 Actor 可以扩展 那么普通 Actor 有任何实际的生产用途吗 如果远程 Actor 仅具有较小的初始设置开销
  • 为什么流式数据集会失败并显示“当流式数据帧/数据集上存在流式聚合时不支持完整输出模式...”?

    我使用 Spark 2 2 0 在 Windows 上使用 Spark 结构化流时出现以下错误 有时不支持完整输出模式streaming aggregations on streaming DataFrames DataSets没有wate
  • Scala+Slick 3:将一个查询的结果插入到另一张表中

    这个问题是关于 slick 3 0 或 3 1 的 我对此很灵活 我有一个中间查询 我用它来处理map for等等以获得我想要的结果 最后我有一个 val foo DBIOAction Seq MySchema Bar NoStream E
  • 类型不匹配;发现:长 需要:Int

    我有一个应该返回 Long 的方法 但我收到一个错误 type mismatch found Long required Int 方法如下 def getRandom IMEI from Long to Long Long if from
  • 组合部分函数

    我有两个偏函数f and g 它们没有副作用并且执行速度快 将它们组合成另一个部分函数的最佳方法是什么h这样h isDefinedAt x iff f isDefinedAt x g isDefinedAt f x 如果h是一个返回一个函数
  • 如何识别远程参与者?

    我有一个远程参与者 客户端 它正在向另一个远程参与者 服务器 注册 然后注销 使用关闭挂钩 然而 虽然服务器接收到注销 但实际sender财产是一个不同的 Channel 对象 所以在我的服务器日志中我有 Registered new cl
  • scala.concurrent.blocking - 它实际上做了什么?

    我花了一段时间学习 Scala 执行上下文 底层线程模型和并发性的主题 你能解释一下通过什么方式吗scala concurrent blocking 调整运行时行为 and 可以提高性能或避免死锁 如中所述scaladoc http www
  • scala 中的模拟案例类:Mockito

    在我的游戏应用程序中 我打算模拟一个案例类 我可以这样做 但它创建了一个所有成员变量都为空的对象 有没有办法创建案例类的模拟对象 以便该对象可以初始化一些成员 case class User name String address Stri
  • 在 EB 上的 Docker 中运行的应用程序拒绝连接到自身

    我有一个 Play 2 Web 应用程序 我使用 Docker 将其部署到 Elastic Beanstalk 在此 Web 应用程序中 我启动了一个 Akka 集群 启动过程涉及将自动伸缩组中的所有节点添加为种子节点 包括其自身 第一次部
  • 将额外的参数传递给多态函数?

    我有一个多态函数 可以将列表转换为集合 import shapeless PolyDefns gt import shapeless val lists List 1 2 List A B List 1 1 2 2 HNil object
  • 为什么 Cassandra 客户端在生产中没有 epoll 时会失败? [复制]

    这个问题在这里已经有答案了 当我在本地运行服务时 我收到一条警告 指出 epoll 不可用 因此它使用 NIO 很公平 当我将其部署到 Kubernetes 中时 我得到了以下信息 这导致服务无法运行 2017 03 29T19 09 22
  • 用 HashMap[Int, Vector[Int]] (Scala) 表示图(邻接列表)?

    我想知道如何 如果可能的话 我可以通过以下方式制作 可变 图的邻接列表表示HashMap Int Vector Int HashMap当然是可变的 目前我将其设置为HashMap Int ArrayBuffer Int 但我可以更改 Arr
  • 如何在 akka actor 中测试公共方法?

    我有一个 akka 演员 class MyActor extends Actor def recieve def getCount id String Int do a lot of stuff proccess id do more st
  • Spark Streaming 中是否需要检查点

    我注意到 Spark 流示例也有检查点代码 我的问题是检查点有多重要 如果是为了容错 那么在此类流应用程序中发生故障的频率是多少 这一切都取决于您的用例 假设您正在运行一个流作业 它仅从 Kafka 读取数据并计算记录数 如果您的应用程序在
  • 带可变参数的 Spark UDF

    如文档中所示 列出最多 22 个参数是唯一的选择吗 https spark apache org docs 1 5 0 api scala index html org apache spark sql UDFRegistration ht
  • Scala REPL / SBT Console 是否有配置文件?

    我一直在尝试找到某种点文件来放入 Scala REPL 设置和自定义函数 我特别有兴趣传递它的标志 例如 Dscala color 启用语法突出显示 以及覆盖设置 如结果字符串截断 scala gt power scala gt vals

随机推荐

  • ASP.NET Core 5.0 JWT 身份验证抛出 401 代码 [重复]

    这个问题在这里已经有答案了 我有一个使用 JWT 身份验证的 ASP NET Core 5 0 API 目前我想要它做的就是读取按钮中的令牌 Html ActionLink Test Oper Home 这是 Authorize 标题并根据
  • IBM Worklight - 我们可以创建基于桌面的应用程序吗?

    我们可以使用 Worklight 创建桌面应用程序吗 例如 我们可以使用 Adob e AIR Titanium RCP 等创建基于桌面的应用程序 我们可以使用 Worklight 开发什么类型的应用程序 如果单击 Worklight 图标
  • 加特林不在会话中存储值

    我在加特林中使用会话来存储值 如下所示 exec session gt val id Instant now toEpochMilli toString scala util Random nextInt 1000 toString ses
  • 将 MKMapView 更新为从 CLGeocoder 返回的 CLPlacemark

    我希望能够通过允许用户在 UIAlertView 中输入地址或位置来更新 MKMapView 上显示的区域 我目前有 if geocoder geocoding geocoder cancelGeocode geocoder geocode
  • 无法更新数据属性值

    虽然网上有一些这样的例子 但它似乎不能正常工作 我无法弄清楚问题所在 我有这个简单的 HTML div a href change data value a 每次我单击 更改数据值 链接时 我想更新 data num 的数据值 例如 我需要
  • 如何使用 Javascript/XPCOM 作为 Windows“运行...”打开 .EXE?

    我有一个 Intranet Web 应用程序 需要运行一些外部应用程序 例如 Word 记事本和其他特定应用程序 我的代码允许使用 IE ActiveX 和 Firefox XPCOM 进行访问 当我使用整个路径 如 C windows n
  • 使用 Bootstrap 3 的 Asp 单选按钮使它们显示为切换按钮

    我有 4 个 asp 单选按钮 我希望它们看起来像切换按钮 但仍充当 asp 单选按钮 主要是 我需要在单击按钮时触发 CheckedChanged 事件 目前 这就是我的 HTML div class col xs 10 btn grou
  • String 类中的什么方法只返回前 N 个字符?

    我想写一个扩展方法String类 以便如果输入字符串长于提供的长度N 只有第一个N要显示的字符 它看起来是这样的 public static string TruncateLongString this string str int max
  • 从 onUpdate 更改小部件的图标

    我有一个小部件 每次收到更新广播时都会更改其图标 但是 该小部件始终无法正确显示其图标 并显示文本 加载小部件时出现问题 Logcat 消息是 WARN AppWidgetHostView 612 updateAppWidget could
  • MISRA C++-2008 规则 5-0-15 - 数组索引应是指针算术的唯一形式

    我需要有更多 MISRA 经验的人来帮助我解决这个问题 我有以下代码 byte buf new std nothrow byte bufferSize for uint32 t i 0 i lt bufferSize i 4 buf i 0
  • 如何在 C++ 中不使用 OLE DB API 查询 MS SQL Compact Server 3.5 数据库?

    我有 MS SQL Compact Server 3 5 的 dll 和包含文件 没有 OLE DB 如何使用它 我只想自己加载 dll 并调用必要的方法 请不要使用 COM 有谁知道API吗 EDIT 如果这是不可能的 是否有一个功能齐全
  • 为什么我在 JSON 中转义引号的尝试不起作用?

    我正在使用 JQuery parseJSON 函数解析 JSON 字符串 就像我在代码中做过很多次一样 不过 在这个特殊情况下 我得到 Uncaught SyntaxError Unexpected token R 在我的 JSON 格式字
  • 从不同视图模型同步可观察量的简单、干净的方法

    假设我有两个视图模型 每个视图模型都有一个可观察的属性 表示不同但相似的数据 function site1Model username this username ko observable username function site2M
  • .net2.0 和 .net4.0 编写的 Web 服务客户端之间的差异

    我在通过 SSL 使用 Java Web 服务时遇到问题 我有两种方法 一种使用 net4 0 一种使用 net2 0 不幸的是 net4 0方法不起作用 但是 早期版本 2 0 可以正常工作 class Program static vo
  • 从 python 字典中打印列

    我有一本一周多提交的字典 我想以每周日历样式的列将它们打印出来 Fri Commit 04 15PM Move flex to mixin and do mobile first queries n Commit 03 52PM use p
  • 如何在 WPF 中平铺和叠加图像?

    我对 WPF 非常陌生 并尝试将应用程序从 VB6 移植到 C 和 XAML 我现在需要做的是用许多小图像创建一个大图像 这些小图像排列成一系列 瓷砖 其中一些较小的将有叠加在其上的覆盖层 在 VB6 中 只需使用 PictureBox 控
  • 在方法内使用扫描仪

    我是编程新手 所以如果有一个非常简单的答案 我深表歉意 但我似乎找不到任何实际的答案 我正在使用扫描仪对象在猜数字游戏中进行用户输入 扫描仪在我的主方法中声明 并将在其他单个方法中使用 但该方法将在各处被调用 我尝试将其声明为静态 但 Ec
  • 在后台播放网页中的声音文件

    我想在我的网页上后台播放声音文件 不希望出现媒体播放器 UI 我的网站将在 Fire Fox 上运行 我使用了 Embed 元素并设置了 Hidden 属性是真实的问题是 除非我删除隐藏属性 否则不会播放任何声音 在这种情况下 声音文件会播
  • React 组件渲染方法无缘无故被调用两次

    import App css import SolarSystem from components solarSystem solarSystem class App extends React Component componentDid
  • Akka 整合并发数据库请求

    我希望能够向多个数据存储库发出并发请求并合并结果 我试图了解我的方法是否有效 或者是否有更好的方法来解决这个问题 我绝对是 Akka Spray Scala 的新手 并且真的想更好地了解如何正确构建这些组件 任何建议 提示将不胜感激 试图让