Akka 流如何不断实现?

2024-04-15

我在用阿卡流 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-index.html在 Scala 中进行轮询AWS SQS https://aws.amazon.com/sqs/队列使用AWS Java SDK https://aws.amazon.com/sdk-for-java/。我创建了一个演员出版商 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html#ActorPublisher以两秒的间隔将消息出队:

class SQSSubscriber(name: String) extends ActorPublisher[Message] {
  implicit val materializer = ActorMaterializer()

  val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")

  val client = new AmazonSQSClient()
  client.setRegion(RegionUtils.getRegion("us-east-1"))
  val url = client.getQueueUrl(name).getQueueUrl

  val MaxBufferSize = 100
  var buf = Vector.empty[Message]

  override def receive: Receive = {
    case "dequeue" =>
      val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
      messages.foreach(self ! _)
    case message: Message if buf.size == MaxBufferSize =>
      log.error("The buffer is full")
    case message: Message =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(message)
      else {
        buf :+= message
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

在我的应用程序中,我也尝试以 2 秒的间隔运行流程:

val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
  .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
  .to(Sink.ignore)

system.scheduler.schedule(0 seconds, 2 seconds) {
  flow.runWith(sqsSource)(ActorMaterializer()(system))
}

但是,当我运行我的应用程序时,我收到java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]以及随后的死信通知,这是由ActorMaterializer.

是否有推荐的方法来持续实现 Akka Stream?


我认为你不需要创建一个新的ActorPublisher每 2 秒一次。这看起来是多余的并且浪费内存。另外,我不认为 ActorPublisher 是必要的。根据我对代码的了解,您的实现将有越来越多的流都查询相同的数据。每个Message来自客户端的数据将由 N 个不同的 akka Stream 处理,更糟糕的是,N 会随着时间的推移而增长。

用于无限循环查询的迭代器

您可以使用 scala 从 ActorPublisher 获得相同的行为Iterator。可以创建一个连续查询客户端的迭代器:

//setup the client
val client = {
  val sqsClient = new AmazonSQSClient()
  sqsClient setRegion (RegionUtils getRegion "us-east-1")
  sqsClient
}

val url = client.getQueueUrl(name).getQueueUrl

//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
  client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}

def messageListIteartor : Iterator[Iterable[Message]] = 
  Iterator continually messageListStream

//messages one-at-a-time "on demand", no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity

此实现仅在所有先前的消息都已被消耗时才查询客户端,因此是真正的reactive http://www.reactivemanifesto.org/。无需跟踪固定大小的缓冲区。您的解决方案需要一个缓冲区,因为消息的创建(通过计时器)与消息的消耗(通过 println)是分离的。在我的实现中,创建和消费是紧密耦合 https://www.google.com/search?q=kermit%20ms%20piggy%20wedding%20images通过背压。

Akka 流源

然后,您可以使用此迭代器生成器函数来提供 akka 流源:

def messageSource : Source[Message, _] = Source fromIterator messageIterator

流动形成

最后你可以使用这个源来执行println(附注:你的flow值实际上是一个Sink since Flow + Sink = Sink)。使用您的flow问题的价值:

messageSource runWith flow

一个 akka Stream 处理所有消息。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Akka 流如何不断实现? 的相关文章

随机推荐

  • Leakcanary 使用 AppCompatActivity 和 FragmentStatePagerAdapter 显示片段泄漏

    我已经遇到这个问题有一段时间了 我真的不知道还能尝试什么 我在我的项目中使用了 Leakcanary 这样我就可以强迫自己以 正确的方式 学习 android 而不会选择导致内存泄漏的模式 我正在构建一个应用程序 我已经将其精简了很多 但我
  • 如何使用 jQuery 处理复选框的更改?

    我有一些代码
  • 如何禁用 Nexus Galaxy 画布上的点击突出显示?

    我正在编写一个使用 HTML5 画布的 Web 应用程序并在 Nexus Galaxy 上进行测试 当用户点击画布时 它会突出显示青色一秒钟 即使我在 touchstart 事件上调用了 PreventDefault 我也尝试过一些CSS规
  • jQuery 和 CSS - 按高度剪切文本,不截断

    因为我想切换文本 所以需要隐藏其中的一部分 Problem 我的文本高度将为 X 或更少像素 div 的高度取决于侧边栏的高度 并且不像此演示那样静态 如果最后一行的字母现在被截断 参见演示 我也想隐藏该行 看看我的演示 http jsfi
  • Oracle REGEX_SUBSTR 不支持空值

    我有一个 regex substr 不支持 null 值的问题 select REGEXP SUBSTR 2035197553 2 S 14 JUN 14 P 1 1 AS phn nbr REGEXP SUBSTR 2035197553
  • iOS Safari – 如何禁用过度滚动但允许可滚动 div 正常滚动?

    我正在开发一个基于 iPad 的网络应用程序 需要防止过度滚动 使其看起来不像网页 我目前正在使用它来冻结视口并禁用过度滚动 document body addEventListener touchmove function e e pre
  • 将 JPA AttributeConverter 用于布尔 Y/N 字段:“无法呈现布尔文字值”

    我正在实施解决方案here https stackoverflow com a 22368268 26535将 Y N 列转换为布尔值 Basic optional false Column name ACTIVE YN Convert c
  • 使用数字属性的 MVC3 DataAnnotationsExtensions 错误

    我已经安装了 Scott 的 Kirkland DataAnnotationsExtensions 在我的模型中我有 Numeric public double expectedcost get set 在我看来 Html EditorFo
  • 根据磁盘可用空间获取节点IP

    我正在尝试编写一个 Ansible 剧本来检查多个服务器上的磁盘空间 到目前为止 这是我的 Ansible 剧本 hosts all become yes tasks name Check freespace shell df h awk
  • 在第二次编辑后刷新表单[重复]

    这个问题在这里已经有答案了 嘿大家好 我目前正在尝试在更改完成后立即刷新表单 在我的第一个表单上 我按下 创建 按钮 这将打开另一个表单 form2 第二个表单将具有输入字段 并允许您输入填充第一个表单上的组合框的值 在第二个表单上有一个
  • 标题消息就像 Stack Overflow 中一样

    这是我第一次访问堆栈溢出 我看到了一条漂亮的标题消息 其中显示了文本和关闭按钮 标题栏是固定的 非常能吸引访问者的注意力 我想知道你们中是否有人知道获得相同类型标题栏的代码 快速的纯 JavaScript 实现 function Messa
  • openui5:如何在 RowRepeater 中获取当前 JSON 模型元素

    我无法获取绑定到 RowRepeater 元素的当前 JSON 模型元素 对于表和列表 我只需检索当前索引 或多个索引 并根据这些值 指向 JSON 模型中的匹配元素 但是 RowRepeater 元素没有当前索引属性 我觉得我应该能够直接
  • 主题消息可以在activemq中持久化吗?

    我对 JMS 和 ESB 非常陌生 我使用 activemq 作为 JMS 使用 mule 作为 ESB 当我将消息从一个队列转发到另一个队列时 jms 连接器参数 persistentDelivery 为 true 它会在 activem
  • 将部分 Activity/Fragment 保存为图像

    我试图保存我的活动的一部分 没有工具栏和状态栏 我现在拥有的代码可以保存整个屏幕 请参考下图 我现在拥有的代码 llIDCardRootView LinearLayout view findViewById R id ll id card
  • Laravel psr-4 不自动加载

    我有一个在本地运行良好的 Laravel 项目 Mavericks 但 psr 4 下的类未加载到我们的阶段服务器 CentOS 上 每次尝试作曲家更新或运行 artisan 命令时 我都会收到反射 未找到类 错误 我所有的应用程序特定类都
  • 无法将 IBOutlet 连接到 Storyboard

    我最近开始使用故事板为我的 iPad 应用程序创建 iPhone 界面 我已将项目更改为 Universal 而不是 iPad 并在项目摘要屏幕中将所需的故事板分配给其设备 但是当我尝试使用 control drag 将任何元素连接到一段代
  • 如何以编程方式激活“在高 dpi 设置上禁用显示缩放”[重复]

    这个问题在这里已经有答案了 我正在 Visual Studio 2010 上使用 C 开发 Windows 窗体应用程序 我发现如果我使用高 dpi 显示设置 该应用程序会缩放 但是 如果我通过 Windows 资源管理器上的鼠标右键单击菜
  • 在 Visual Studio 2017 中出现未处理的异常后启用编辑

    在 Visual Studio 2017 中出现未处理的异常后如何继续执行 在 2015 及以下版本中 可以通过单击轻松完成此操作Enable Editing它 将调用堆栈展开到异常之前的点 然后就可以编辑执行点 变量和代码 当库抛出异常时
  • 如何使用“here-doc”将行打印到文件?

    基本上 这是我在过去半小时内编程和使用 Google 的结果 试图实现一个简单的事情 从以下位置获取用户输入 STDIN并将它们写入结构化 XML 文件作为输出 下面是我丑陋的代码 bin perl print img URL img lt
  • Akka 流如何不断实现?

    我在用阿卡流 http doc akka io docs akka stream and http experimental 1 0 scala stream index html在 Scala 中进行轮询AWS SQS https aws