演员邮箱溢出。斯卡拉

2024-05-08

我目前正在与 scala 的两位演员合作。一、producer,产生一些数据并将其发送到parcer。生产者发送一个HashMap[String,HashMap[Object,List[Int]]]通过消息(以及this标记发件人):

parcer ! (this,data)

解析器不断地等待消息,如下所示:

def act(){
    loop{
      react{
        case (producer, data)=> parse(data);
      }
    }
}

正常情况下程序可以完美运行。问题在于发送大量数据和许多消息(哈希大约有 10^4 个元素,内部哈希大约有 100 个元素,列表长 100),程序崩溃。它没有显示错误或异常。它就停止了。

问题似乎是我的生产者的工作速度比解析器快得多(目前我不需要多个解析器)。

看完之后scala邮箱大小限制 https://stackoverflow.com/questions/1617580/scala-mailbox-size-limit我想知道我的解析器的邮箱是否已达到其限制。该帖子还提供了一些解决方案,但我首先需要确定这就是问题所在。我该如何测试这个?

有没有办法知道演员的记忆极限?读取邮箱中已用/空闲内存怎么样?

任何尚未发布的工作流程建议那个链接 https://stackoverflow.com/questions/1617580/scala-mailbox-size-limit也欢迎。

Thanks,


首先,您不需要显式传递发送者,因为发送者无论如何都会被 Scala 参与者框架跟踪。您始终可以使用以下方法访问消息的发件人sender.

从这里可以看出:scala.actors.MQueue https://lampsvn.epfl.ch/trac/scala/browser/scala/trunk/src///actors/scala/actors/MessageQueue.scala,参与者的邮箱被实现为链表,因此仅受堆大小的限制。

不过,如果您担心生产者非常快而消费者非常慢,我建议您探索一种节流机制。但我不会推荐采用已接受的问题答案的方法scala邮箱大小限制 https://stackoverflow.com/questions/1617580/scala-mailbox-size-limit.

一般来说,当系统压力很大时尝试发送过载消息似乎不是一个好主意。如果您的系统太忙而无法检查过载怎么办?如果过载消息的接收者太忙而无法对其采取行动怎么办?另外,对我来说,删除消息听起来并不是一个好主意。我认为您希望所有工作项目都得到可靠处理。

另外,我不会依赖mailboxSize来确定负载。您无法区分不同的消息类型,并且只能从消费者本身进行检查,而不能从生产者进行检查。

我建议使用一种方法,让消费者在知道自己可以处理的情况下要求更多的工作。

下面是一个如何实现它的简单示例。

import scala.actors._
import Actor._

object ConsumerProducer {
  def main(args: Array[String]) {
    val producer = new Producer(Iterator.range(0, 10000))
    val consumer = new Consumer(producer)
  }
}

case class Produce(count: Int)
case object Finished

class Producer[T](source: Iterator[T]) extends Actor {

  start

  def act() {
    loopWhile(source.hasNext) {
      react {
        case Produce(n: Int) => produce(n)
      } 
    }
  }

  def produce(n: Int) {
    println("producing " + n)
    var remaining = n
    source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
    if(!source.hasNext) sender ! Finished
  }
}

class Consumer(producer: Actor) extends Actor {

  start

  private var remaining = 0

  def act() {
    requestWork()
    consume()
  }

  def consume(): Nothing = react {
    case Finished => println("Finished")
    case n: Int => work(n); requestWork(); consume()
  }

  def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }

  def work(n: Int) = {
    println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
    remaining -= 1
  }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

演员邮箱溢出。斯卡拉 的相关文章

随机推荐