我目前正在与 scala 的两位演员合作。一、producer,产生一些数据并将其发送到parcer。生产者发送一个HashMap[String,HashMap[Object,List[Int]]]
通过消息(以及this标记发件人):
parcer ! (this,data)
解析器不断地等待消息,如下所示:
def act(){
loop{
react{
case (producer, data)=> parse(data);
}
}
}
正常情况下程序可以完美运行。问题在于发送大量数据和许多消息(哈希大约有 10^4 个元素,内部哈希大约有 100 个元素,列表长 100),程序崩溃。它没有显示错误或异常。它就停止了。
问题似乎是我的生产者的工作速度比解析器快得多(目前我不需要多个解析器)。
看完之后scala邮箱大小限制 https://stackoverflow.com/questions/1617580/scala-mailbox-size-limit我想知道我的解析器的邮箱是否已达到其限制。该帖子还提供了一些解决方案,但我首先需要确定这就是问题所在。我该如何测试这个?
有没有办法知道演员的记忆极限?读取邮箱中已用/空闲内存怎么样?
任何尚未发布的工作流程建议那个链接 https://stackoverflow.com/questions/1617580/scala-mailbox-size-limit也欢迎。
Thanks,
首先,您不需要显式传递发送者,因为发送者无论如何都会被 Scala 参与者框架跟踪。您始终可以使用以下方法访问消息的发件人sender
.
从这里可以看出:scala.actors.MQueue https://lampsvn.epfl.ch/trac/scala/browser/scala/trunk/src///actors/scala/actors/MessageQueue.scala,参与者的邮箱被实现为链表,因此仅受堆大小的限制。
不过,如果您担心生产者非常快而消费者非常慢,我建议您探索一种节流机制。但我不会推荐采用已接受的问题答案的方法scala邮箱大小限制 https://stackoverflow.com/questions/1617580/scala-mailbox-size-limit.
一般来说,当系统压力很大时尝试发送过载消息似乎不是一个好主意。如果您的系统太忙而无法检查过载怎么办?如果过载消息的接收者太忙而无法对其采取行动怎么办?另外,对我来说,删除消息听起来并不是一个好主意。我认为您希望所有工作项目都得到可靠处理。
另外,我不会依赖mailboxSize
来确定负载。您无法区分不同的消息类型,并且只能从消费者本身进行检查,而不能从生产者进行检查。
我建议使用一种方法,让消费者在知道自己可以处理的情况下要求更多的工作。
下面是一个如何实现它的简单示例。
import scala.actors._
import Actor._
object ConsumerProducer {
def main(args: Array[String]) {
val producer = new Producer(Iterator.range(0, 10000))
val consumer = new Consumer(producer)
}
}
case class Produce(count: Int)
case object Finished
class Producer[T](source: Iterator[T]) extends Actor {
start
def act() {
loopWhile(source.hasNext) {
react {
case Produce(n: Int) => produce(n)
}
}
}
def produce(n: Int) {
println("producing " + n)
var remaining = n
source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
if(!source.hasNext) sender ! Finished
}
}
class Consumer(producer: Actor) extends Actor {
start
private var remaining = 0
def act() {
requestWork()
consume()
}
def consume(): Nothing = react {
case Finished => println("Finished")
case n: Int => work(n); requestWork(); consume()
}
def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }
def work(n: Int) = {
println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
remaining -= 1
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)