监控封闭图 Akka Stream

2024-04-03

如果我创建了一个RunningGraph在 Akka Stream 中,我怎么知道(从外部)

  1. 当所有节点因完成而被取消时?
  2. 当所有节点因错误而停止时?

我认为没有办法对任意图执行此操作,但是如果您可以控制图,则只需将监视接收器附加到每个可能失败或完成的节点的输出(这些节点具有至少一个输出),例如:

import akka.actor.Status

// obtain graph parts (this can be done inside the graph building as well)
val source: Source[Int, NotUsed] = ...
val flow: Flow[Int, String, NotUsed] = ...
val sink: Sink[String, NotUsed] = ...

// create monitoring actors
val aggregate = actorSystem.actorOf(Props[Aggregate])
val sourceMonitorActor = actorSystem.actorOf(Props(new Monitor("source", aggregate)))
val flowMonitorActor = actorSystem.actorOf(Props(new Monitor("flow", aggregate)))

// create the graph
val graph = GraphDSL.create() { implicit b =>
   import GraphDSL._

   val sourceMonitor = b.add(Sink.actorRef(sourceMonitorActor, Status.Success(()))),
   val flowMonitor = b.add(Sink.actorRef(flowMonitorActor, Status.Success(())))

   val bc1 = b.add(Broadcast[Int](2))
   val bc2 = b.add(Broadcast[String](2))

   // main flow
   source ~> bc1 ~> flow ~> bc2 ~> sink

   // monitoring branches
   bc1 ~> sourceMonitor
   bc2 ~> flowMonitor

   ClosedShape
}

// run the graph
RunnableGraph.fromGraph(graph).run()

class Monitor(name: String, aggregate: ActorRef) extends Actor {
  override def receive: Receive = {
    case Status.Success(_) => aggregate ! s"$name completed successfully"
    case Status.Failure(e) => aggregate ! s"$name completed with failure: ${e.getMessage}"
    case _ =>
  }
}

class Aggregate extends Actor {
  override def receive: Receive = {
    case s: String => println(s)
  }
}

也可以只创建一个监控参与者并在所有监控接收器中使用它,但在这种情况下,您将无法轻松地区分失败的流。

而且还有watchTermination() http://doc.akka.io/api/akka/2.4.5/index.html#akka.stream.scaladsl.Source@watchTermination%5BMat2%5D()(matF:(Mat,scala.concurrent.Future%5Bakka.Done%5D)=%3EMat2):FlowOpsMat.this.ReprMat%5BOut,Mat2%5D关于源和流的方法,允许实现与此时流一起终止的未来。我认为它可能很难使用GraphDSL,但对于常规流方法,它可能如下所示:

import akka.Done
import akka.actor.Status
import akka.pattern.pipe

val monitor = actorSystem.actorOf(Props[Monitor])
source
  .watchTermination()((f, _) => f pipeTo monitor) 
  .via(flow).watchTermination((f, _) => f pipeTo monitor)
  .to(sink)
  .run()

class Monitor extends Actor {
  override def receive: Receive = {
    case Done => println("stream completed")
    case Status.Failure(e) => println(s"stream failed: ${e.getMessage}")
  }
}

您可以先改变未来,然后再将其值传递给参与者以区分流。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

监控封闭图 Akka Stream 的相关文章

随机推荐

  • 如何将联系人添加到 ActiveCampaign API v3 中的列表

    根据v3docs https developers activecampaign com v3 reference lists我应该能够使用联系人 API 将联系人添加到列表中 但我不知道如何执行此操作 因为我在下面看不到任何相关列表Con
  • 如何选择 PrimeFaces TreeTable 中的所有内容?

    我有一个 PrimeFaces 树表 想添加一个复选框 允许用户选择 取消选择全部 这是我的 xhtml 的摘录
  • LibGDX 路径 (CatmullRomSpline) 恒速

    我试图使用 LibGDX CatmullRomSpline 在路径上实现恒定速度 但在使其正常工作时遇到问题 我已经尝试对这个主题进行了很多研究 包括阅读 LibGDX wiki 但他们对实现恒定速度的解释并没有真正意义 我无法让他们的方法
  • 如何获取已安装的 OLE DB 提供程序的列表?

    Microsoft Excel 允许从 其他来源 导入数据 其中一种选择是使用 OLE DB 提供程序 如何获取可用 OLE DB 提供程序的列表 如果您有可用的 powershell 只需将其粘贴到 powershell 命令提示符中 f
  • 这是如何使用 CloudConfigurationManager 设置上下文连接字符串?

    我希望使用 CloudConfigurationManager 以便可以利用 Azure 配置文件 我想使用连接字符串 我在 Cloud cscfg 中添加了一个字符串来配置实体框架上下文 我像这样配置我的上下文 public Domain
  • SOFT_INPUT_ADJUST_RESIZE 从 android 30 开始已弃用

    I used SOFT INPUT ADJUST RESIZE为了在键盘弹出时显示所有内容 根据文档 我添加了新的代码片段 if Build VERSION SDK INT gt Build VERSION CODES R requireA
  • UnsatisfiableError:发现以下规范彼此不兼容:

    我在 Ubuntu 16 04 上运行 并且遇到了 Anaconda 问题 由于软件包不一致 我无法再安装 更新或删除软件包 conda info NVIDIA no NVIDIA devices found active environm
  • 如何在签到前获取 place_id ?

    我的应用程序允许用户使用纬度 经度和地名登录 Facebook 但我不知道如何处理 place 参数 有什么方法可以获取或创建一个吗 每个可签到的地点都必须有一个有效的地点 Facebook 页面 The place参数取入page id地
  • admob广告不显示

    我制作了一个简单的应用程序 并在其中添加了一个广告 但它没有显示在设备中 我的代码中没有错误 xml页面
  • Dockerfile 构建引发不够范围:公共 openjdk 映像授权失败

    我正在尝试使用以下命令运行以下 Dockerfile docker compose up remove orphans force recreate build d Dockerfile FROM maven 3 6 3 jdk 11 as
  • Java中如何查找CPU密集型类?

    我有一个使用多线程的 Java 大型程序 在某些情况下 程序开始使用我的八核系统中的三个核心的 100 在正常使用中 程序以 1 2 的速度使用所有核心 如何找到重载核心的类 使用分析器 例如与 jdk 1 6 0 10 捆绑的 jvisu
  • BIO中的B代表什么?

    OpenSSL 使用称为 I O 抽象BIO http www openssl org docs crypto bio html 但我在文档中找不到任何地方说明 B 代表什么 IO 显然是输入 输出 各种网站表明 B 代表basic or
  • jQuery 自动淡入下一个 html 元素然后循环重新启动

    I have a a a a 其中 i 是从 1 开始的当前迭代的值 并将呈现如下内容 a a a a 我需要的是一个脚本 仅使第一个元素保持可见 然后在几秒钟 假设 4 5 后淡出并淡入下一个元素 并重复这个循环直到最后一个元素 然后循环
  • 如何知道这个线程是否是UI线程

    Android 上有什么方法可以知道运行我的代码的线程是否是 UI 线程 在摇摆有SwingUtilities isEventDispatchThread 告诉我我是否在 UI 线程上 Android SDK 中是否有任何函数可以让我知道这
  • 将查询转换为存储过程

    下面是一个包含临时表的大查询 我一直在试图找出将其转换为存储过程的语法 我似乎无法弄清楚 我应该能够使用变量选择日期范围 Report Start DT and Report End DT CREATE PROCEDURE gw ppp d
  • Selenium Headless Chrome 和语言设置

    我正在使用 python 中的 selenium 和 chrome 驱动程序 我可以像这样设置 chrome 浏览器的语言 options webdriver ChromeOptions options add experimental o
  • 多重继承:派生类仅从一个基类获取属性?

    我试图学习Python中多重继承的概念 考虑一个类Derv派生自两个类 Base1 and Base2 Derv仅继承第一个基类的成员 class Base1 def init self self x 10 class Base2 def
  • Angular 路由的奇怪行为

    我之前曾问过我奇怪的路由问题 我现在有一个解决方案 但仍然偶尔会有奇怪的行为 我的页面应该转到 登录 一旦成功 然后转到 主 这实际上现在有效 除了转到登录 gt 主页 gt 空白 gt 主页 最后的 闪烁 或刷新我不明白 这解释了最初的行
  • SQL - 抑制重复的*相邻*记录

    我需要运行一个 Select 语句 DB2 SQL 该语句不会根据某个字段提取相邻行重复项 具体来说 我想找出数据何时changes 这变得很困难 因为它可能会变回其原始值 也就是说 我有一个大致类似于下面的表格 按字母排序 然后按日期排序
  • 监控封闭图 Akka Stream

    如果我创建了一个RunningGraph在 Akka Stream 中 我怎么知道 从外部 当所有节点因完成而被取消时 当所有节点因错误而停止时 我认为没有办法对任意图执行此操作 但是如果您可以控制图 则只需将监视接收器附加到每个可能失败或