RxJava Observable.fromEmitter 奇怪的背压行为

2024-03-02

我一直在利用Observable.fromEmitter()作为一个绝佳的替代品Observable.create()。我最近遇到了一些奇怪的行为,但我不太明白为什么会出现这种情况。我真的很感谢对背压和调度程序有一定了解的人来看看这个。

public final class EmitterTest {
  public static void main(String[] args) {
    Observable<Integer> obs = Observable.fromEmitter(emitter -> {
      for (int i = 1; i < 1000; i++) {
        if (i % 5 == 0) {
          sleep(300L);
        }

        emitter.onNext(i);
      }

      emitter.onCompleted();
    }, Emitter.BackpressureMode.LATEST);

    obs.subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())
        .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"

    sleep(10000L);
  }

  private static void sleep(Long duration) {
    try {
      Thread.sleep(duration);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

该应用程序的输出是

Received 1
Received 2
...
Received 128

然后它仍然停留在 128(大概是因为这是 RxJava 的默认缓冲区大小)。

如果我更改指定的模式fromEmitter() to BackpressureMode.NONE,那么代码将按预期工作。如果我删除对observeOn(),它也按预期工作。有谁能够解释为什么会出现这种情况吗?


这是同池死锁的情况。subscribeOn安排下游request在它正在使用的同一线程上,但如果该线程正忙于睡眠/发射循环,则请求永远不会传递到fromEmitter因此一段时间后LATEST如果主源等待足够长的时间,就会开始删除元素,直到最后一个值 (999) 被传递为止。 (这与以下情况类似onBackpressureBlock我们删除了。)

If subscribeOn如果没有执行此请求调度,该示例将正常工作。

我已经打开了an issue https://github.com/ReactiveX/RxJava/issues/4735制定解决方案。

目前的解决方法是使用更大的缓冲区大小observeOn(有过载)或使用fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava Observable.fromEmitter 奇怪的背压行为 的相关文章

随机推荐

  • CSS 位置元素“固定”在滚动容器内

    我想知道是否有人找到了解决方案 我正在寻找一种将元素附加到滚动容器顶部的解决方案 HTML div class container div class header title div div class element div about
  • CSS响应中心部门

    我想将一些有背景图像的 div 居中 该 div 的响应存在问题 因为如果我将宽度设置为 80 高度设置为 80 则背景图像不会位于中心 我尝试了一切 但图片不能只站在中心 如果浏览器更小或更大 这是一个非常大的问题 所以如果你看图片 我想
  • R Shiny:从 Excel 复制单元格并将其粘贴到 Shiny 应用程序中,然后使用它们创建数据表

    我正在开发一个 R Shiny 应用程序 我需要开发以下功能 我需要从 Excel 中复制单元格行 开始时一次一列 然后使用 selectizeInput textInput 或 textAreaInput 将它们粘贴到 Shiny 中 数
  • 将 STL 容器 转换为容器

    我正在寻找一种方法来制定具有以下内容的课程 使用具有最大 常量 的指针的 STL 容器的接口 但它会在内部改变所指向的对象 与非常量模拟相比 没有额外的运行时开销 理想情况下 与非常量版本相比 该解决方案不会编译为额外的代码 因为常量 非常
  • 有条件禁用/重新启用 jQuery 单击事件

    我在禁用和重新启用链接上的点击事件时遇到问题 设置为一行 4 列 每列包含一个链接和隐藏内容框 单击链接时 它会展开该行并显示特定于该列的内容框 单击链接并展开行后 所有其他链接都会淡出 然后 您可以重新单击打开的链接以关闭该行并取消淡入淡
  • 如何使用 NDK 17 为 64 位 Android 构建 OpenSSL 1.1.1

    无法为 64 位 Android 构建 OpenSSL 以下是我已采取的步骤 下载了setenv android sh from https wiki openssl org images 7 70 Setenv android sh ht
  • 使用 lubridate 进行矢量化时区转换

    我有一个数据框 其中包含一列日期时间字符串 library tidyverse library lubridate testdf data frame mytz c Australia Sydney Australia Adelaide A
  • 从文本文件中读取并将其加载到 matlab 中的矩阵中[重复]

    这个问题在这里已经有答案了 我有一个名为坐标 txt 的文本文件 格式如下 0 0 0 0 95 0 32 0 02 1 02 0 26 0 96 0 73 0 6 0 52 0 77 0 6 0 71 0 28 0 0 95 0 14 0
  • N个矩形的并集周长

    我想知道解决这个问题的有效方法 给定N个矩形 并给出左上角和右下角 请求N个矩形的并集周长 我只有O N 2 算法太慢 所以请寻找更高效的算法 您可以假设坐标值为正整数且小于 100000 EDIT For example in this
  • simpleXML 根据属性获取节点子节点[重复]

    这个问题在这里已经有答案了 我正在尝试解析我通过其属性之一引用的节点的值 但我不确定语法 XML
  • Netty如何使用线程池?

    您能解释一下 Netty 如何使用线程池来工作吗 我是否理解正确 有两种线程池 老板和工人 Boss 用来做 I O worker 用来调用用户回调 messageReceived 来处理数据 这是来自 NioServerSocketCha
  • 使用什么工具来解析Python中的编程语言?

    您可以推荐哪种 Python 工具来解析编程语言 它应该允许源代码中语言语法的可读表示 并且应该能够扩展到复杂的语言 语法像 Python 本身一样复杂的语言 当我搜索时 我主要找到 pyparsing 我将对其进行评估 但当然我对其他替代
  • 用于单元/集成测试的嵌入式动物园管理员

    是否有嵌入式动物园管理员以便我们可以在单元测试中使用它 它可以与测试一起发货并开箱即用 也许我们可以模拟一些服务并注册到嵌入式动物园管理员 The Curator https github com Netflix curator wiki框
  • Mac 版 Github:缺少拉取请求按钮

    周六 我在 Mac 上安装了 Github Desktop 并尝试了 Github Workflow 创建一个分支 提交更改并执行拉取请求 一切顺利 今天我在工作中安装了 Mac 版 Github 但找不到 Pull Request 按钮
  • Typescript 在 vs 2015 ctp 6 中禁用保存时编译

    我需要知道如何禁用打字稿文件保存时编译 默认情况下启用 Typescript 编译 您可以执行下一步来禁用它 选择并单击 卸载项目 菜单项 选择已卸载的项目并单击 编辑 kproj 将新的 PropertyGroup 节点添加到项目根节点
  • 捆绑链接的 JavaScript 文件

    我正在使用 Visual Studio 2012 和 MVC4 我已将链接文件 来自另一个项目 添加到我的 MVC4 应用程序中 以下是该文件的属性 构建操作 内容 复制到输出目录 不复制 这是我的捆绑包的示例 bundles Add ne
  • WPF DataGrid AlternatingRowBackground 和 RowStyle 优先级

    我该如何做我的RowStyle后申请AlternatingRowBackground 我想要物品 有IsOrange as true具有Orange背景 无论交替的行背景如何 目前情况并非如此 XAML
  • 当选择文本时,如何用我自己的视图替换 UIMenuController?

    当选择文本时 默认情况下会弹出一个 UIMenuController 其中包含剪切 复制 粘贴等功能 我想用我自己的自定义视图替换它 外观相似 但高两倍 以便我可以有两行按钮 自定义视图 我怎样才能做到这一点 我知道没有简单的方法 我预计即
  • Mongodb动态like运算符

    在 mongodb 中相当于 sql like 运算符是 db users find shows m 使用 nodejs javascript 我想根据 url 参数动态更改字母 我努力了 letter req params letter
  • RxJava Observable.fromEmitter 奇怪的背压行为

    我一直在利用Observable fromEmitter 作为一个绝佳的替代品Observable create 我最近遇到了一些奇怪的行为 但我不太明白为什么会出现这种情况 我真的很感谢对背压和调度程序有一定了解的人来看看这个 publi