如何订阅在不同 JVM 上运行的反应式流实现?

2024-03-25

假设我们有两个 Akka Stream 流,每个流都在自己的 JVM 上运行。

// A reactive streams publisher running on JVM 1:
val stringPublisher: Publisher[String] = Source(() => "Lorem Ipsum".split("\\s").iterator).runWith(Sink.publisher[String])

// A reactive streams subscriber running on JVM 2:
def subscriber: Subscriber[String] = Sink.foreach[String](println(_)).runWith(Source.subscriber[String])

// Subscribe the second stream to the first stream
stringPublisher.subscribe(subscriber)

此示例在一个 JVM 上运行良好,但我如何订阅在不同 JVM 上运行的发布者?

我是否必须使用消息传递/队列中间件,或者我可以使用反应式流 API 将两者连接在一起吗?


The 反应流 http://reactive-streams.org规范没有谈到分布式(跨网络)流,并且它的当前实现(以 Akka Streams 为例)都没有实现跨网络边界的流。这样做有点棘手(但可以做到,也可能会做到),因为它需要在消息丢失的情况下透明地重新传递。

简短的回答:你(目前)不能。然而,由于 Akka HTTP 是基于流的,并通过 TCP 应用背压,您可以通过以下方式连接流:基于 TCP 或 HTTP 的流 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-io.html并且背压将按预期工作。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何订阅在不同 JVM 上运行的反应式流实现? 的相关文章

  • 目标 applet 或 JVM 进程突然退出

    我收到消息Target applet or JVM process exited abruptly有时 它不会连续出现 所以我无法在 Java 控制台关闭之前跟踪日志 我尝试使用增加堆大小
  • Java动态澄清

    我在术语下看到了这段摘录Java 流行语在读一本关于Java的书时我不明白 Dynamic Java 程序带有大量的运行时类型信息 用于在运行时验证和解析对对象的访问 这使得以安全且方便的方式动态链接代码成为可能 这对于 Java 环境的稳
  • G1 GC 单个、非常长的年轻 GC 发生且 ParallelGCThreads=1

    I set ParallelGCThreads 1并使用G1 GC 所有其他JVM设置为默认设置 我跑PageRank在 Spark 1 5 1 上 有两个 EC2 节点 每个节点有 100 GB 堆 我的堆使用情况图如下 红色区域 年轻代
  • 为什么要实现finalize()?

    我已经阅读了很多 Java 新手问题finalize 令人困惑的是 没有人真正明确表示 Finalize 是一种不可靠的清理资源的方法 我看到有人评论说他们用它来清理连接 这真的很可怕 因为接近保证连接关闭的唯一方法是最后实现 try ca
  • 增加堆大小后无法启动 Glassfish

    我想增加 Glassfish 的堆大小 为此 我知道我可以达到 4GB java Xmx4000M version java version 1 6 0 26 Java TM SE Runtime Environment build 1 6
  • 如何使用IntelliJ IDEA ThreadDumpVisualizer插件分析Java线程转储

    我正在寻找使用一些线程转储分析器来分析 Java 线程转储并安装了ThreadDumpVisualizerIntelliJ IDEA 插件 但不知道如何使用它 插件页面 https plugins jetbrains com plugin
  • Akka 流如何不断实现?

    我在用阿卡流 http doc akka io docs akka stream and http experimental 1 0 scala stream index html在 Scala 中进行轮询AWS SQS https aws
  • 集群环境下的Spring Singleton

    正如中所讨论的this https stackoverflow com questions 1194129 singleton in cluster environmentpost 不适合使用单例聚集的环境 因为不同 JVM 中有多个单例对
  • Java 堆被无法访问的对象淹没

    我们的 Java EE 应用程序开始出现一些严重问题 具体来说 应用程序在启动后几分钟内就运行了高达 99 的老年代堆 不会抛出 OOM 但实际上 JVM 没有响应 jstat 显示老年代的大小根本没有减少 没有垃圾收集正在进行 并且kil
  • 使用 + 符号连接字符串

    今天我在读书Antonio 关于 toString 性能的博客 https antoniogoncalves org 2015 06 30 who cares about tostring performance 还有一段话 昨天曾经被认为
  • 我的代码中出现内存不足异常

    作为 Oracle 数据库压力测试的一部分 我正在长时间运行代码并使用 java 版本 1 4 2 简而言之 我正在做的是 while true Allocating some memory as a blob byte data new
  • 为什么 JVM 同时具有“invokespecial”和“invokestatic”操作码?

    两条指令都使用静态而不是动态调度 似乎唯一的实质性区别是invokespecial始终将一个对象作为其第一个参数 该对象是分派方法所属类的实例 然而 invokespecial实际上并没有把物体放在那里 编译器负责通过在发出之前发出适当的堆
  • 如何查看JVM中JIT编译的代码?

    有什么方法可以查看 JVM 中 JIT 生成的本机代码吗 一般用法 正如其他答案所解释的 您可以使用以下 JVM 选项运行 XX UnlockDiagnosticVMOptions XX PrintAssembly 根据特定方法进行过滤 您
  • java.library.path 中没有字体管理器

    以下代码在我的桌面上运行得很好 BufferedImage image new BufferedImage width height BufferedImage TYPE INT RGB Graphics g image getGraphi
  • Scala REPL 中的递归重载语义 - JVM 语言

    使用 Scala 的命令行 REPL def foo x Int Unit def foo x String Unit println foo 2 gives error type mismatch found Int 2 required
  • Java 类:匿名类、嵌套类、私有类

    有人能解释一下Java中匿名类 嵌套类和私有类之间的区别吗 我想知道与每个相关的运行时成本以及每个编译器的方法 这样我就可以掌握哪个最适合用于例如性能 编译器优化的潜力 内存使用以及其他 Java 编码人员的普遍可接受性 我所说的匿名类是指
  • 使用 javac 和 javax.tools.JavaCompiler 有什么区别?

    Maven 编译器插件文档states http maven apache org plugins maven compiler plugin 编译器插件用于编译项目的源代码 从 3 0 开始 默认编译器是 javax tools Java
  • OQL 包中的所有实例

    是否有可能在OQL检索属于一个包的所有对象 或者我可以查询wildcards 正如 haridsv 建议我尝试过的 SELECT from com example and SELECT a from com example but in V
  • Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)

    我对 Scala 和 Akka 完全陌生 我有一个简单的 RunnableFlow Source gt Flow do some transformation gt Sink runForeach 现在我想要这样的东西 Source gt
  • 抛出 Java 异常时是否会生成堆栈跟踪?

    这是假设我们不调用 printstacktrace 方法 只是抛出和捕获 我们正在考虑这样做是为了解决一些性能瓶颈 不 堆栈跟踪是在构造异常对象时生成的 而不是在抛出异常对象时生成的 Throwable 构造函数调用 fillInStack

随机推荐