这是因为事实上这是两个独立的可观察量。当您调用时它们会“生成”subscribe()
。因此,您提供的步骤是不正确的,因为步骤 3 和 4 只是 1 和 2,但基于不同的可观察值。
但由于发生日志记录的线程,您将它们视为 1 1 1 2 2 2。如果您要删除observeOn()
然后你会看到以交织的方式排放。要查看以下运行代码:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
Observable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation());
//.observeOn(single);
dataStream.subscribe(i -> System.out.println("1 " + Thread.currentThread().getName() + " " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + Thread.currentThread().getName() + " " + (i - l)));
Thread.sleep(1000);
}
输出,至少在我的运行中是(注意线程名称):
1 RxComputationThreadPool-1 135376988
2 RxComputationThreadPool-2 135376988
1 RxComputationThreadPool-1 135486815
2 RxComputationThreadPool-2 135537383
1 RxComputationThreadPool-1 135560691
2 RxComputationThreadPool-2 135617580
如果你应用observeOn()
它成为了:
1 RxSingleScheduler-1 186656395
1 RxSingleScheduler-1 187919407
1 RxSingleScheduler-1 187923753
2 RxSingleScheduler-1 186656790
2 RxSingleScheduler-1 187860148
2 RxSingleScheduler-1 187864889
正如您正确指出的那样,要获得您想要的东西,您需要publish().refcount()
或者简单地share()
(它是一个别名)运算符。
这是因为publish()
创建一个ConnectableObservable
它不会开始发射物品,直到通过connect()
方法。在这种情况下,如果你这样做:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
ConnectableObservable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation())
.observeOn(single)
.publish();
dataStream.subscribe(i -> System.out.println("1 " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + (i - l)));
Thread.sleep(1000);
dataStream.connect();
Thread.sleep(1000);
}
你会注意到,在第一秒(第一Thread.sleep()
调用)什么也没有发生,就在之后dataStream.connect()
称为排放发生。
refCount()
接收 ConnectableObservable 并对订阅者隐藏调用的需要connect()
通过计算当前有多少订阅者订阅。它的作用是在第一次订阅时调用connect()
最后一次取消订阅后,取消原始可观察值的订阅。
至于相互取消publish().autoConnect()
,之后你确实得到了一个可观察量,但它有一个特殊的属性,假设原始可观察量通过互联网进行 API 调用(持续 10 秒),当你在没有使用它的情况下使用它时share()
您最终将向服务器发出与这 10 秒内的订阅数量一样多的并行查询。另一方面与share()
您将只有一个电话。
如果共享的可观察量非常快地完成其工作(例如just(1,2,3)
).
autoConnect()
/refCount()
为您提供一个您订阅的中间可观察值,而不是原始可观察值。
如果您有兴趣深入了解这本书:使用 RxJava 进行响应式编程 https://www.safaribooksonline.com/library/view/reactive-programming-with/9781491931646/