如何将多个 actor 作为源附加到 Akka 流?

2024-01-04

我正在尝试构建并运行一个 akka 流(在 Java DSL 中),其中 2 个 actor 作为源,然后是一个合并结点,然后是 1 个接收器:

    Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.backpressure());
    Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.backpressure());
    Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println));

    RunnableFlow<BoxedUnit> closed = FlowGraph.factory().closed(sink, (b, out) -> {
        UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2));
        b.from(src1).via(merge).to(out);
        b.from(src2).to(merge);
    });

    closed.run(mat);

我的问题是如何获取对源参与者的 ActorRef 引用以便向他们发送消息?如果有 1 个演员,我不会使用图形生成器,然后 .run() 或 runWith() 方法将返回 ActorRef 对象。但是如果源演员很多怎么办?这样的流程有可能实现吗?


回答我自己的问题,以防有人需要。

根据 jrudolph 的建议,我能够使用这样的 actor(在实际代码中,我做了一些比 2 个 ActorRef 的列表更好的事情):

    Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.fail());
    Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.fail());
    Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println));

    RunnableFlow<List<ActorRef>> closed = FlowGraph.factory().closed(src1, src2, (a1, a2) -> Arrays.asList(a1, a2), (b, s1, s2) -> {
        UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2));
        b.from(s1).via(merge).to(sink);
        b.from(s2).to(merge);
    });

    List<ActorRef> stream = closed.run(mat);
    ActorRef a1 = stream.get(0);
    ActorRef a2 = stream.get(1);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将多个 actor 作为源附加到 Akka 流? 的相关文章

  • 反转 HList 并转换为类?

    我使用 Shapeless 在 Akka 中累积物化值作为 HList 并将其转换为案例类 对于这个问题 您不必了解 Akka 太多 但默认方法将物化值累积为递归嵌套的 2 元组 这并不是很有趣 因此 Shapeless HLists 似乎
  • Akka 和 Typesafe 配置版本问题

    我尝试在 Tomcat 服务器上使用 akka 2 1 0 但我收到一个错误 要求我将配置库也放在类路径上 好吧 这不是问题 我将 Typesafe 的配置库 版本 1 0 0 最新 放在 lib 文件夹中 但是 我总是收到错误 8d315
  • Akka/Java getContext().become 带参数?

    在 Akka Scala 中 可以将参数传递给自定义接收函数 因此可以通过 params 传递整个 actor 状态 而无需使用可变变量 context become myCustomReceive param1 param2 但在 Jav
  • 使用 ScalaTest 测试时,为什么 Akka 会失败并显示“IllegalStateException:在终止或终止时无法创建子项”?

    这是我从 Akka 收到的错误 debug Running TaskDef com suredbits core util time TimeUtilUnitTest org scalatest tools Framework anon 1
  • groupBy 的子流可以依赖于它们生成的键吗?

    我有一个包含与用户关联的数据的流程 我还为每个用户提供了一个状态 我可以从数据库异步获取该状态 我想将我的流与每个用户一个子流分开 并在具体化子流时加载每个用户的状态 以便可以根据该状态来处理子流的元素 如果我不想合并下游的子流 我可以做一
  • 停止系统中的所有参与者而不关闭系统本身?

    在Akka 2 0中 有没有一个好的方法来关闭 user路径下的所有actor 例如 假设我执行以下操作 val system ActorSystem create mySystem system actorOf Props new MyA
  • 节点如何知道哪些节点已经看到集群当前状态?

    我正在阅读 akka 文档 并在理解他们的实现方式时遇到了一些麻烦Gossip 文档在这里 http doc akka io docs akka 2 4 common cluster html Gossip Protocol 让我困惑的部分
  • 如何使用 Akka HTTP 从多个参与者/Web 处理程序正确调用单个服务器?

    我有一个服务 我们称之为服务 A 它使用 Akka Server HTTP 来处理传入请求 我还有第 3 方应用程序 服务 B 它提供了多种 Web 服务 服务 A 的目的是转换客户端请求 调用服务 B 的一个或多个 Web 服务 合并 转
  • 通过连接池发出 http 请求时 Akka Flow 挂起

    我正在使用 Akka 2 4 4 并尝试从 Apache HttpAsyncClient 迁移 未成功 下面是我在项目中使用的代码的简化版本 问题是 如果我向流程发送超过 1 3 个请求 它就会挂起 到目前为止 经过6个小时的调试 我什至找
  • 我可以从任意异步任务访问 Http.Context.current() 吗?

    我正在开发一个移动应用程序的后端 该应用程序当前在 Play 2 1 1 上运行 作为处理某些请求的一部分 我们会发送推送通知 发送推送通知的下游请求应该完全异步 并且与移动客户端的原始请求响应分离 我想访问Http Context cur
  • 为什么 Actor.receive 是偏函数?

    Why is Actor receive部分功能 我总是可以使用带有匹配表达式的正则函数来代替它 It is a PartialFunction捕获消息被处理或未处理的可能性Actor 未处理的消息将 不让演员失败MatchError 产卵
  • Actor 中的 WebSocket.acceptWithActor 和 @Inject()(播放 2.5)

    WebSocket acceptWithActor不使用 Guice 实例化一个新的 Akka actor 在 Play 2 4 中 仍然可以通过导入来为我的 actor 使用注入器play api Play current 片段来自Rea
  • Akka 流如何不断实现?

    我在用阿卡流 http doc akka io docs akka stream and http experimental 1 0 scala stream index html在 Scala 中进行轮询AWS SQS https aws
  • 为什么参与者“询问”模式被视为反模式或“代码味道”?

    据我所知 询问 模式被认为是一种不好的做法 应该避免 相反 推荐的模式是 每个请求的参与者 模型 然而 这对我来说没有意义 因为 询问 模式正是这样做的 它为每个请求创建一个轻量级参与者 那么为什么这被认为是不好的 特别是当 future
  • 如何隐藏 Akka 远程 Actor 来查找?

    我正在运行 Akka 2 0 2 微内核 并希望为不受信任的远程参与者实现身份验证方案 首先想到的是设置一个身份验证参与者 当身份验证成功时 该参与者会返回对工作参与者的引用 但是 我应该如何保护工作参与者不被简单地通过 actorFor
  • 如何将 sbteclipse 插件添加到 SBT 0.10.x

    我想查看akka的源代码 似乎使用的是0 7 x版本的sbt 我将项目配置转换为0 10 x版本 当我在库依赖项中添加 sbteclipse 并运行 eclipse create src 生成 eclipse 项目时 它告诉 eclipse
  • 如何解析 Spray-routing 中的 get 请求参数?

    这就是代码部分的样子 get respondWithMediaType MediaTypes application json entity as HttpRequest obj gt complete println obj ok 我可以
  • 知道 akka actor 何时完成

    有几个人和我一起从事一个项目 一直在试图找出解决这个问题的最佳方法 看起来这应该是经常需要的标准东西 但由于某种原因我们似乎无法得到正确的答案 如果我有一些工作要做 并且我向路由器抛出一堆消息 我如何知道所有工作何时完成 例如 如果我们正在
  • 如何在 IntelliJ IDEA 中运行 akka actor

    来自 Akka 网站文档 然后 这个主要方法将创建所需的基础设施 运行演员 启动给定的主要演员并安排 一旦主要参与者终止 整个应用程序就会关闭 因此 您将能够使用类似于以下的命令运行上面的代码 下列的 java classpath akka
  • Akka/Scala:映射 Future 与 pipelineTo

    In Akka参与者 在发送一个Future结果给另一个演员 A 映射Future发挥作用tell结果给演员 B 定义一个onSuccess未来的回调 其中tell结果给演员 C 管道Future结果给演员pipeTo 其中一些选项已在上一

随机推荐

  • 获取Google广告ID并限制广告

    我正在构建一个 Unity Android 应用程序 并查看一些广告 我们正在考虑的一项服务需要我的谷歌广告 ID 并限制广告状态 以便进行服务器到服务器的转换跟踪 问题是我不确定如何在 Unity 中获取这些值 看来我需要某种形式的插件
  • 将值插入循环内的关联数组中

    我是 php 的新手 我使用 foreach 循环来遍历已解码对象的数组 我想为每次迭代向新数组输入值 这是代码的一部分 example of array before decoding it id 1 quantity 12 other
  • jQuery 在 jsFiddle 中可以运行,但在我的电脑上不行

    我是 jQuery 的新手 一整天都在绞尽脑汁试图确定为什么这个脚本在 jsFiddle 中运行而不是在我的计算机上运行 我没有服务器设置 我只是从桌面在浏览器中启动 html 代码在这里工作正常 http jsfiddle net 9Du
  • Julia 语言 - @async 任务中的状态 :: 当前目录

    我注意到 阅读 捕获了生产错误 Julia 中的不同任务 没有自己的工作目录 但当前目录 是共享的 我意识到在操作系统级别这是显而易见的 一个进程有一个工作目录 我的问题是第一 是否有任何其他明显或不太明显的全局状态我应该注意 显然是环境变
  • Box.COM 与 Android 应用程序集成-OAuthActivity-NullpointerException?

    我正在整合BOX COM与我的 android 应用程序 所以我正在使用BoxAndroidLibraryV2 and BoxJavaLibraryV2与我的申请 我可以运行示例 Android 应用程序BoxSDKSample 因此 我从
  • 使用 R 中的 fct_reorder 根据值重新排序因子

    My data structure list LoB c C C C A A B C A A C A B C B A C B A B C A B B A B C A B C B word c speed connection call bt
  • 如何使用 ansible 'expect' 模块来处理多个不同的响应?

    在这里 我尝试测试我的 bash 脚本 它提示了四次 bin bash date gt opt prompt txt read p enter one one echo one echo one gt gt opt prompt txt r
  • 如何在perl中删除文件的最后10行

    我将总行数作为用户输入 然后从文件中删除这些行数 我看到了这个 learn perl org faq perlfaq5 html How do I count the number of lines in a file 然后我厌倦了下面的简
  • 堆转储中不可能的 Java 内存引用

    我有一个在晚上 7 41 拍摄的 Java 堆转储 我正在使用 Eclipse 内存分析工具对其进行分析 堆转储包含 20 个会话对象 对堆中的这些会话对象之一使用 GC 根路径 命令会显示以下 3 个对该会话对象的引用 来自终结器线程拥有
  • 找不到netbeans 8.0.2中maven本地存储库路径的设置位置

    在 netbeans 中 当我转到 工具 gt 选项 gt Java gt Maven 面板时 显示 Maven 主页 捆绑Maven主页 https i stack imgur com 8yQIQ png 但是我在Netbeans 8 0
  • Heroku、Grails:如果使用多个 Web dyno,则会缺少资源

    我已经创建了 grails 应用程序并将其上传到 heroku 中 如果我使用 heroku 规模网络 1 一切看起来都不错 但如果我跑 heroku 规模网络 2 一些静态资源消失了 从日志中我可以看出 web 2 dyno 中的所有静态
  • 阴影位于文本上方

    我正在向 Xamarin UWP 项目添加阴影 但问题并不是真正特定于 Xamarin 而是一般来说是 UWP bool IsShadowSupported gt ApiInformation IsApiContractPresent Wi
  • 无法使用带有 connect 的 ref 调用子方法

    我想从子组件调用一个方法 按照这里的建议从父方法调用子方法 https stackoverflow com questions 37949981 call child method from parent 但是 当子组件使用来自react
  • 在 while 循环中延迟

    所以我想在 jquery 中执行延迟的 ajax 请求 直到收到特定的服务器响应 非空 我该怎么做呢 while data response null ajax done function data function doUntilResu
  • Python将列表分成n块

    我知道这个问题已经被讨论过很多次了 但我的要求不同 我有一个类似的列表 range 1 26 我想把这个列表分成固定数量n 假设 n 6 gt gt gt x 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 1
  • 联合和结构初始化

    我偶然发现了一段基于 C 语言联合的代码 代码如下 union struct char ax 2 char ab 2 s struct int a int b st u 12 1 printf d d u st a u st b 我只是不明
  • 以最少的比较次数对数组进行排序

    我的计算机科学作业需要一些帮助 我需要编写一个排序例程 在最坏的情况下使用 7 次比较对长度为 5 的数组进行排序 我已经证明 由于决策树的高度 将需要 7 次比较 我考虑使用决策树 硬编码 但这意味着该算法非常复杂 并且我的导师暗示这不是
  • GWT MVP架构优势

    我正在学习 GWT 并且在多个地方读到 使用 MVP 架构最适合开发 GWT 应用程序 我还读到 使用 MVP ARCH 进行测试很容易 有人可以解释一下为什么使用 MVP 架构进行测试很容易 另外 我正在使用 MVP 开发一个项目 我发现
  • for 循环的简洁数学符号

    如果这不属于这里 我很抱歉 但我正在寻找一种方法来描述我的代码的数学背景 使用 numpy 我对两个以上的维数组求和 a shape 10 5 2 b shape 5 2 c a b c shape 10 5 2 是否有一个纯粹的数学符号
  • 如何将多个 actor 作为源附加到 Akka 流?

    我正在尝试构建并运行一个 akka 流 在 Java DSL 中 其中 2 个 actor 作为源 然后是一个合并结点 然后是 1 个接收器 Source