RxJava,一个可观察多个订阅者:publish().autoConnect()

2024-02-25

我正在使用 rxJava/rxAndroid,但有一些非常基本的东西没有按照我的预期运行。 我有一个可观察对象和两个订阅者:

Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));

这是输出:

D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3

现在,我知道我可以通过使用来避免重复计数publish().autoConnect()但我首先尝试理解这种默认行为。 每次有人订阅可观察对象时,它就会开始发出数字序列。我明白了。那么,当什么时候Subscriber 1连接它开始发射物品。Subscriber 2立即连接,为什么它没有获取值?

我是这样理解的,从可观察的角度来看:

  1. 有人订阅了我,我应该开始发布物品
    [订阅者:1][要发出的项目:1,2,3]

  2. 向订阅者发送项目“1”
    [订阅者:1][要发出的项目:2,3]

  3. 其他人订阅了我,完成后我会再次发出 1,2,3
    [订阅者:1 和 2][要发出的项目:2,3,1,2,3]

  4. 向订阅者发送项目“2”
    [订阅者:1 和 2][要发出的项目:3,1,2,3]

  5. 向订阅者发送项目“3”
    [订阅者:1 和 2][要发出的项目:1,2,3]

  6. 向订阅者发送项目“1”
    [订阅者:1 和 2][要发出的项目:2,3]

  7. ...

但这不是它的工作原理。 就好像它们是两个独立的可观察量合而为一。这让我很困惑,为什么他们不向所有订阅者提供这些物品?

Bonus:

那个怎么样publish().autoConnect()解决问题了吗? 让我们来分解一下。publish()给我一个可连接的可观察的。可连接的可观察量就像常规的可观察量一样,但您可以告诉它何时连接。然后我继续通过调用告诉它立即连接autoConnect()

这样做……我不就得到了和开始时一样的结果吗?一个普通的正则可观察量。运营商似乎互相取消了。

我可以闭嘴并使用publish().autoconnect()。但我想更多地了解可观察量是如何工作的。

Thanks!


这是因为事实上这是两个独立的可观察量。当您调用时它们会“生成”subscribe()。因此,您提供的步骤是不正确的,因为步骤 3 和 4 只是 1 和 2,但基于不同的可观察值。

但由于发生日志记录的线程,您将它们视为 1 1 1 2 2 2。如果您要删除observeOn()然后你会看到以交织的方式排放。要查看以下运行代码:

@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    Observable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation());
                    //.observeOn(single);

    dataStream.subscribe(i -> System.out.println("1  " + Thread.currentThread().getName() + " " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + Thread.currentThread().getName() + " " + (i - l)));

    Thread.sleep(1000);
}

输出,至少在我的运行中是(注意线程名称):

1  RxComputationThreadPool-1 135376988
2  RxComputationThreadPool-2 135376988
1  RxComputationThreadPool-1 135486815
2  RxComputationThreadPool-2 135537383
1  RxComputationThreadPool-1 135560691
2  RxComputationThreadPool-2 135617580

如果你应用observeOn()它成为了:

1  RxSingleScheduler-1 186656395
1  RxSingleScheduler-1 187919407
1  RxSingleScheduler-1 187923753
2  RxSingleScheduler-1 186656790
2  RxSingleScheduler-1 187860148
2  RxSingleScheduler-1 187864889

正如您正确指出的那样,要获得您想要的东西,您需要publish().refcount()或者简单地share()(它是一个别名)运算符。

这是因为publish()创建一个ConnectableObservable它不会开始发射物品,直到通过connect()方法。在这种情况下,如果你这样做:

@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    ConnectableObservable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation())
                    .observeOn(single)
                    .publish();

    dataStream.subscribe(i -> System.out.println("1  " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + (i - l)));

    Thread.sleep(1000);
    dataStream.connect();
    Thread.sleep(1000);

}

你会注意到,在第一秒(第一Thread.sleep()调用)什么也没有发生,就在之后dataStream.connect()称为排放发生。

refCount()接收 ConnectableObservable 并对订阅者隐藏调用的需要connect()通过计算当前有多少订阅者订阅。它的作用是在第一次订阅时调用connect()最后一次取消订阅后,取消原始可观察值的订阅。

至于相互取消publish().autoConnect(),之后你确实得到了一个可观察量,但它有一个特殊的属性,假设原始可观察量通过互联网进行 API 调用(持续 10 秒),当你在没有使用它的情况下使用它时share()您最终将向服务器发出与这 10 秒内的订阅数量一样多的并行查询。另一方面与share()您将只有一个电话。

如果共享的可观察量非常快地完成其工作(例如just(1,2,3)).

autoConnect()/refCount()为您提供一个您订阅的中间可观察值,而不是原始可观察值。

如果您有兴趣深入了解这本书:使用 RxJava 进行响应式编程 https://www.safaribooksonline.com/library/view/reactive-programming-with/9781491931646/

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava,一个可观察多个订阅者:publish().autoConnect() 的相关文章

随机推荐

  • 停止 shell 通配符扩展?

    有没有办法让编译后的命令行程序告诉 bash 或 csh 它不希望在其参数中扩展任何通配符 例如 人们可能需要一个 shell 命令 例如 foo 简单地返回该字符的 ASCII 数字值 不会 扩展发生在命令实际运行之前 您只能在运行命令之
  • 如何在报告中插入两页

    我面临一个问题 还有两个问题jrmxl文件 我想加入其中pdf文件 但每个都在一页中 我看到了下面的一些提示 但我不知道它们是否是最好的 因为我的第一个文件有 3 个频段 title detail and summary 第二个有detai
  • 如何获取 UITableView 标签文本字符串 - 自定义单元格

    我有一个带有自定义单元格的 UITableView 自定义单元格包含 UILabel 和 UIImageView 我在网上看到 当用户按下单元格时 可以从普通的 UITableView 单元格获取文本并将其存储在字符串中 但是 当您使用自定
  • 如何从不同的范围创建对象

    我在 Guice 中有一个范围单例的对象 在方法中f 我想创建一个新对象 但让 Guice 进行注入 我认为传递注射器并不是一个好的做法 那么我怎样才能获得一个新的 Guicy 对象实例呢 正如上面所建议的 提供商可能是做到这一点的方法 这
  • 功能检测自动播放 HTML5 音频 - 移动浏览器上的音频

    因此 我有一个网站 用户希望演示服务器端脚本生成的音频输出 他们选择一些选项并点击创建按钮 然后我在 HTML5 音频元素中进行 AJAX 并将 autoplay 属性设置为 true 这在桌面上效果很好 但在移动设备上效果不佳 到目前为止
  • 动态引用 Excel 工作表

    我有一个应该很简单的问题 但我没有解决它 我为一家商店打印了价目表 今年他们将零件编号分成了 5 张工作表 而不是一张 当用户想要打印价格标签时 她在 C10 中输入 单击工作表 价格表 并导航到她需要的零件号 C10 的计算公式为 价目表
  • 如何以编程方式从类的方法之一中查找类的公共属性

    我有课Foo具有公共和受保护的财产 Foo需要有一个非静态方法 getPublicVars 返回所有公共属性的列表Foo 这只是一个例子 我从outside the Foo对象调用get object vars http php net g
  • 使用主机系统上的客户端访问在虚拟机中运行的 HBase

    我尝试使用客户端程序将一些数据写入hbase HBase Hadoop 在 Cloudera ubuntu 的预配置虚拟机中运行 客户端运行在托管虚拟机的系统上 并直接在虚拟机中运行客户端 所以现在想使用vm外的客户端来访问vm上的服务器
  • 是否可以从命令行启动 IE 的代理设置对话框?

    有没有办法从 Windows 命令行启动 IE 代理设置对话框 以节省在任何应用程序中浏览菜单的时间 我发现了另一个更短的 inetcpl cpl 4 您可以在运行框或命令提示符中使用它
  • IE11 + Angular 1.5.11 上奇怪的渲染行为

    我们目前正在 Angular 版本 1 5 11 中开发一个应用程序 现在它已经变得相当大 数百个控制器等 我们偶然发现了 Internet Explorer 11 中的一个问题 一段时间后 有时是几分钟 有时是几个小时 页面开始出现渲染故
  • Visual Studio 无法识别我的网络摄像头激光测距仪代码的 MFC 库

    我尝试直接从互联网复制源代码 但由于下面发现的错误 我无法构建 调试整个文件 请帮忙 Error occurred while restoring NuGet packages System ArgumentException The pa
  • 如何使用 gdb 调试进程而不暂停它?

    我有一个已经在运行的进程 我想用 GDB 调试它 我一直在使用 gdb pid PID 但是 当我这样做时 该过程会暂停 我想附加到进程而不暂停它 并在它仍在运行时在其内存中查看 这可能吗 或者 有没有办法 分叉 该进程 以便我可以查看其内
  • canOpenUrl 失败,但 openUrl 成功

    我面临一个奇怪的问题 我正在使用 xcode 7 2 iOS 9 在真实设备 iphone 4S 不是模拟器 上工作 我有 2 个应用程序 app1 和 app2 app1 应该使用 url 方案将数据发送到 app2 app2已经很好地声
  • 为什么Python中只有主线程可以设置信号处理程序

    在Python的信号处理语义中 只有主线程可以设置信号处理程序 并且只有主线程可以调用信号处理程序 为什么要这样设计呢 此注释出现在 cpython 源文件中信号模块 c https github com python cpython bl
  • 无手拒绝错误:交易查询已完成 - knex、express.js

    我试图首先检查表中的值 如果存在 则删除另一个表中的行并将此新数据插入该表中 我使用了带有 select del 和 insert 命令的事务 db transaction trx gt return trx users where use
  • 没有为实体指定标识符/主键(...)每个实体都必须具有标识符/主键

    I have Peticion实体 但缺少某些内容 因为出现以下错误 No identifier primary key specified for Entity Every Entity must have and identifier
  • cocos2d-x android 设置错误 - java.lang.NullPointerException

    我正在尝试设置适用于 Android 的 cocos2d x我跟着 我通过了终端的步骤 没有任何问题 setup py命令结果符合预期 我的问题是在我设置之后NDK ROOT in C C 构建 环境部分 我得到一些java lang Nu
  • 如何修复猫鼬中的“.create 不是函数”错误

    我正在尝试初始化 Seed js 文件以在我的数据库中启动一些内容 但是当我运行时node bin seed js我不断得到 TypeError Celebrity create is not a function 我已尝试重新安装mong
  • 如何使用滑块同步两个树视图中的滚动

    我正在使用 Visual Studio 2010 C 和 Windows 窗体应用程序 我有两个并排的树视图 并且我已经弄清楚如何使用滚动条上的向上 向下按钮同步滚动 但是当我使用滑块时 它不会移动另一个树视图 我采取了一个有效的列表视图示
  • RxJava,一个可观察多个订阅者:publish().autoConnect()

    我正在使用 rxJava rxAndroid 但有一些非常基本的东西没有按照我的预期运行 我有一个可观察对象和两个订阅者 Observable