我一直在利用Observable.fromEmitter()
作为一个绝佳的替代品Observable.create()
。我最近遇到了一些奇怪的行为,但我不太明白为什么会出现这种情况。我真的很感谢对背压和调度程序有一定了解的人来看看这个。
public final class EmitterTest {
public static void main(String[] args) {
Observable<Integer> obs = Observable.fromEmitter(emitter -> {
for (int i = 1; i < 1000; i++) {
if (i % 5 == 0) {
sleep(300L);
}
emitter.onNext(i);
}
emitter.onCompleted();
}, Emitter.BackpressureMode.LATEST);
obs.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
sleep(10000L);
}
private static void sleep(Long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
该应用程序的输出是
Received 1
Received 2
...
Received 128
然后它仍然停留在 128(大概是因为这是 RxJava 的默认缓冲区大小)。
如果我更改指定的模式fromEmitter()
to BackpressureMode.NONE
,那么代码将按预期工作。如果我删除对observeOn()
,它也按预期工作。有谁能够解释为什么会出现这种情况吗?
这是同池死锁的情况。subscribeOn
安排下游request
在它正在使用的同一线程上,但如果该线程正忙于睡眠/发射循环,则请求永远不会传递到fromEmitter
因此一段时间后LATEST
如果主源等待足够长的时间,就会开始删除元素,直到最后一个值 (999) 被传递为止。 (这与以下情况类似onBackpressureBlock
我们删除了。)
If subscribeOn
如果没有执行此请求调度,该示例将正常工作。
我已经打开了an issue https://github.com/ReactiveX/RxJava/issues/4735制定解决方案。
目前的解决方法是使用更大的缓冲区大小observeOn
(有过载)或使用fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)