我正在尝试将大型 Scala + Akka + PlayMini 应用程序与外部 REST API 连接起来。这个想法是定期轮询(基本上每 1 到 10 分钟)根 URL,然后爬取子级 URL 以提取数据,然后将其发送到消息队列。
我想出了两种方法来做到这一点:
1st way
创建参与者的层次结构以匹配 API 的资源路径结构。在谷歌纵横的例子中,这意味着,例如
- Actor 'latitude/v1/currentLocation' 进行民意调查https://www.googleapis.com/latitude/v1/currentLocation https://www.googleapis.com/latitude/v1/currentLocation
- 演员“纬度/v1/位置”民意调查https://www.googleapis.com/latitude/v1/location https://www.googleapis.com/latitude/v1/location
- 演员“latitude/v1/location/1”民意调查https://www.googleapis.com/latitude/v1/location/1 https://www.googleapis.com/latitude/v1/location/1
- 演员“latitude/v1/location/2”民意调查https://www.googleapis.com/latitude/v1/location/2 https://www.googleapis.com/latitude/v1/location/2
- 演员“latitude/v1/location/3”民意调查https://www.googleapis.com/latitude/v1/location/3 https://www.googleapis.com/latitude/v1/location/3
- etc.
在这种情况下,每个参与者负责定期轮询其关联资源,并为下一级路径资源创建/删除子参与者(即参与者“纬度/v1/位置”为以下资源创建参与者 1、2、3 等)它通过轮询了解的所有位置https://www.googleapis.com/latitude/v1/location https://www.googleapis.com/latitude/v1/location).
2nd way
创建一个相同的轮询参与者池,这些参与者接收由路由器负载平衡的轮询请求(包含资源路径),轮询一次 URL,进行一些处理,并安排轮询请求(针对下一级资源和轮询的 URL) 。在谷歌纵横中,这意味着:
1 个路由器,n 个轮询参与者。初始轮询请求https://www.googleapis.com/latitude/v1/location https://www.googleapis.com/latitude/v1/location导致几个新的(立即)轮询请求https://www.googleapis.com/latitude/v1/location/1 https://www.googleapis.com/latitude/v1/location/1, https://www.googleapis.com/latitude/v1/location/2 https://www.googleapis.com/latitude/v1/location/2等以及对同一资源的一个(延迟的)轮询请求,即https://www.googleapis.com/latitude/v1/location https://www.googleapis.com/latitude/v1/location.
我已经实现了这两种解决方案,但无法立即观察到任何相关的性能差异,至少对于我感兴趣的 API 和轮询频率而言是这样。我发现第一种方法更容易推理,并且可能更容易与系统一起使用.scheduler.schedule(...) 比第二种方法(我需要 ScheduleOnce(...))。此外,假设资源嵌套在多个级别并且寿命有些短暂(例如,在每次轮询之间可能会添加/删除多个资源),akka 的生命周期管理使得在第一种情况下很容易杀死整个分支。第二种方法(理论上)应该更快,并且代码更容易编写。
我的问题是:
- 哪种方法似乎是最好的(在性能、可扩展性、代码复杂性等方面)?
- 您认为这两种方法(尤其是第一种)的设计有什么问题吗?
- 有没有人尝试过实施类似的东西?是怎么做到的?
Thanks!
为什么不创建一个主轮询器,然后按计划启动异步资源请求?
我不是使用 Akka 的专家,但我尝试了一下:
循环访问要获取的资源列表的 poller 对象:
import akka.util.duration._
import akka.actor._
import play.api.Play.current
import play.api.libs.concurrent.Akka
object Poller {
val poller = Akka.system.actorOf(Props(new Actor {
def receive = {
case x: String => Akka.system.actorOf(Props[ActingSpider], name=x.filter(_.isLetterOrDigit)) ! x
}
}))
def start(l: List[String]): List[Cancellable] =
l.map(Akka.system.scheduler.schedule(3 seconds, 3 seconds, poller, _))
def stop(c: Cancellable) {c.cancel()}
}
异步读取资源并触发更多异步读取的参与者。如果更友善的话,您可以将消息发送安排在时间表上,而不是立即致电:
import akka.actor.{Props, Actor}
import java.io.File
class ActingSpider extends Actor {
import context._
def receive = {
case name: String => {
println("reading " + name)
new File(name) match {
case f if f.exists() => spider(f)
case _ => println("File not found")
}
context.stop(self)
}
}
def spider(file: File) {
io.Source.fromFile(file).getLines().foreach(l => {
val k = actorOf(Props[ActingSpider], name=l.filter(_.isLetterOrDigit))
k ! l
})
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)