为什么我的 RxJava Flowable 在使用observeOn 时不考虑背压?

2024-03-22

我正在尝试创建一个Flowable它会发出有关背压的事件以避免内存问题,同时并行运行转换的每个阶段以提高效率。我创建了一个简单的测试程序来推理程序不同步骤的行为以及何时发出事件与在不同阶段等待事件。

我的程序如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
  Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
      .stream().map(i -> {
        System.out.println("emitting:" + i);
        return i;
      });

  Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
  System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));

  Long count = flowable.onBackpressureBuffer(10)
      .buffer(10)
      .flatMap(buf -> {
        System.out.println("Sleeping 500 for batch");
        Thread.sleep(500);
        System.out.println("Got batch of events");
        return Flowable.fromIterable(buf);
      }, 1)
      .map(x -> x + 1)
      .doOnNext(i -> {
        System.out.println(String.format("Sleeping : %d", i));
        Thread.sleep(100);
        System.out.println(i);
      })
      .count()
      .blockingGet();

  System.out.println("count: " + count);
}

当我运行这个程序时,我得到了符合预期的背压的输出,其中一批事件被发出到大小buffer,然后对它们进行平面映射,最后采取一些操作将它们一一打印:

Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
Sleeping 500 for batch
Got batch of events
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
10
emitting:10
emitting:11
emitting:12
emitting:13
emitting:14
emitting:15
emitting:16
emitting:17
emitting:18
emitting:19
Sleeping 500 for batch
Got batch of events
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13

但是,如果我尝试通过添加一些调用来并行化这里的不同操作阶段.observeOn(Schedulers.computation())那么我的程序似乎不再考虑背压。我的代码现在看起来像:

public static void main(String[] args) throws ExecutionException, InterruptedException {
  Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
      .stream().map(i -> {
        System.out.println("emitting:" + i);
        return i;
      });

  Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
  System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));

  Long count = flowable.onBackpressureBuffer(10)
      .buffer(10)
      .observeOn(Schedulers.computation())
      .flatMap(buf -> {
        System.out.println("Sleeping 500 for batch");
        Thread.sleep(500);
        System.out.println("Got batch of events");
        return Flowable.fromIterable(buf);
      }, 1)
      .map(x -> x + 1)
      .observeOn(Schedulers.computation())
      .doOnNext(i -> {
        System.out.println(String.format("Sleeping : %d", i));
        Thread.sleep(100);
        System.out.println(i);
      })
      .observeOn(Schedulers.computation())
      .count()
      .blockingGet();

  System.out.println("count: " + count);
}

我的输出如下,其中所有事件都是预先发出的,而不是考虑各个执行阶段指定的背压和缓冲区:

Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
emitting:10
Sleeping 500 for batch
emitting:11
emitting:12
... everything else is emitted here ...
emitting:998
emitting:999
Got batch of events
Sleeping 500 for batch
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
Got batch of events
Sleeping 500 for batch
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
Got batch of events
Sleeping 500 for batch
10
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
13
Sleeping : 14
14
Sleeping : 15
Got batch of events
Sleeping 500 for batch
15
Sleeping : 16
16
Sleeping : 17
17
Sleeping : 18
18
Sleeping : 19
19
Sleeping : 20
Got batch of events
Sleeping 500 for batch
20
Sleeping : 21
21
Sleeping : 22
22
Sleeping : 23
23
Sleeping : 24
24
Sleeping : 25
Got batch of events
Sleeping 500 for batch
25

假设我的批处理阶段正在调用外部服务,但由于延迟,我希望它们并行运行。我还希望在给定时间控制内存中的项目数量,因为最初发出的项目数量可能变化很大,并且批量操作的阶段运行速度比事件的初始发出慢得多。

我怎样才能拥有我的Flowable尊重跨a的背压Scheduler?为什么当我偶尔拨打电话时,它似乎只是不尊重背压observeOn?


我怎样才能让我的Flowable尊重跨调度程序的背压

其实,申请onBackpressureBuffer使其上方的源与下游施加的任何背压断开,因为它是无界运算符。你不需要它,因为Flowable.fromIterable(顺便说一下,R​​xJava 有一个range操作员)支持并尊重背压。

为什么当我加入对observeOn的调用时,它似乎只是不尊重背压?

在第一个示例中,会发生自然背压,称为调用堆栈阻塞。 RxJava 默认情况下是同步的,大多数运算符不会引入异步,就像第一个示例中没有引入异步一样。

observeOn引入了异步边界,因此理论上,阶段可以彼此并行运行。它有一个默认的 128 个元素预取缓冲区,可以通过其重载之一进行调整。然而,在您的情况下, buffer(10) 实际上会将预取量放大到 1280,这仍然可能导致一次性完全消耗您的 1000 个元素长源。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么我的 RxJava Flowable 在使用observeOn 时不考虑背压? 的相关文章

  • JSF:初始请求和回发请求?

    请看一下 JSF 中的下面这行代码
  • 区域设置的 Java 日期格式

    我怎样才能找到DateFormat对于给定的Locale DateFormat getDateInstance int Locale 例如 import static java text DateFormat DateFormat f ge
  • Android Studio - 无法识别的 VM 选项“MaxPermSize=256m”

    我刚刚在 Elementary OS 0 3 Freya 上安装了 Android Studio 并使用终端运行它 然而 在我第一次启动时 显示一条错误消息 Gradle 测试 项目刷新失败 无法启动守护进程 这个问题可能是由 守护进程的配
  • 关于线程的停止

    我开发了一个代码 它将在执行时启动两个线程 public class MyThread1 extends Thread extend thread class public synchronized void run synchronize
  • 何时在 Springs @Configuration 中将 proxyBeanMethods 设置为 false?

    当查看 spring 自动配置时源代码 https github com spring projects spring boot tree master spring boot project spring boot autoconfigu
  • ANT - 如何使用 javac 排除、排除文件?

    查看了 stackoverflow 上的几篇文章以及其他来源 在线 ANT 定义指南 但到目前为止没有一个有帮助 我无法从编译中排除该文件 我只有一个文件想要从编译中排除 而 ANT 文档并没有真正说明细节 我试图排除HTMLParser
  • 我的 Java Web 应用程序中的 ClassNotFoundException/NoClassDefFoundError

    我使用 Java 开发了一个 Web 应用程序 当我将其部署到我的应用程序服务器 Jetty Tomcat JBoss GlassFish 等 时 会抛出错误 我可以在堆栈跟踪中看到此错误消息 java lang ClassNotFound
  • 如何向正在运行的 Linux 进程发送 Ctrl-Break?

    我正在调试在 Sun 的 JDK 1 4 2 18 上运行的应用程序中的内存泄漏 该版本似乎支持命令行参数 XX HeapDumpOnCtrlBreak 这可能会导致 JVM 在遇到控制中断时转储堆 如何将其发送到 Linux 机器上的后台
  • 比 O(n) 更好的范围交集算法?

    范围交集是一个简单但不平凡的问题 已经回答过两次了 查找数字范围交集 https stackoverflow com questions 224878 find number range intersection 比较日期范围 https
  • Spring - 使用 new 是一种不好的做法吗?

    正在创建对象by hand 即使用new操作员而不是注册Springbean 和使用依赖注入被认为是不好的做法吗 我的意思是 确实Spring IoC容器必须了解应用程序中的所有对象吗 如果是这样 为什么 你希望 Spring 创建 bea
  • 将阻塞调用包装为异步,以实现更好的线程重用和响应式 UI

    我有一个类负责通过调用遗留类来检索产品可用性 该遗留类本身通过进行 BLOCKING 网络调用在内部收集产品数据 请注意 我无法修改旧版 API 的代码 由于所有产品都是相互独立的 因此我希望并行收集信息 而不会创建任何不必要的线程 也不会
  • (简单)boost thread_group 问题

    我正在尝试编写一个相当简单的线程应用程序 但我对 boost 的线程库很陌生 我正在开发的一个简单的测试程序是 include
  • 如何使用 JAVA 将本地图像而不是 URL 发送到 Microsoft Cognitive Face API

    我正在尝试使用 Microsoft 认知服务的 Face API 我想知道如何通过 Rest API 调用将本地图像发送到 Face API 并使用它请求结果JAVA 有人可以帮我解决这个问题吗 Microsoft 在其网站上提供的测试选项
  • 光线追踪三角形

    我正在用java编写一个光线追踪器 并且我能够追踪球体 但我相信我追踪三角形的方式有问题 据我了解 这是基本算法 首先确定射线是否与plane三角形已打开 剪裁所有点 使它们与三角形位于同一平面上 因此xy以平面为例 根据沿着新平面向任意方
  • 在可序列化 Java 类中使用记录器的正确方法是什么?

    我有以下 doctored 我正在开发的系统中的类以及Findbugs http findbugs sourceforge net 正在生成一个SE BAD FIELD http findbugs sourceforge net bugDe
  • JDBC多线程插入可以吗?

    我目前正在开发一个 Java 项目 我需要准备一个大的 对我来说 mysql 数据库 我必须使用 Jsoup 进行网页抓取并将结果存储到我的数据库中 据我估计 我将大约插入 1 500 000 到 2 000 000 条记录 在我的第一次试
  • 在 Spring 中以编程方式解析 AliasFor 注释值

    我有一个注释 Target ElementType TYPE Retention RetentionPolicy RUNTIME public interface A Class value 这是在课堂上使用的 B D class publ
  • 在 Maven Shade 插件中包含依赖项

    我正在尝试使用 Apache 的 commons lang3 创建一个可部署的 jar 但是 我的 Hadoop 所在的 AWS 集群不包含此库 因此我收到了 classNotFoundException 我想我需要手动添加该依赖项 但我在
  • 当考虑性能时如何从文件中读取整数?

    我正在 CodeEval 上执行一些任务 基本上任务非常简单 打印出从文件中读取的所有整数的总和 我的解决方案如下 import java io File import java io IOException import java io
  • Java邮件,设置回复地址不起作用

    我用java写了一个小的电子邮件发送程序 它有from to and reply to地址 当客户端尝试回复邮件时 应该能够回复reply to地址 目前它不起作用 我的代码如下 File Name SendEmail java impor

随机推荐