我对 rxjava 有一个非常具体的问题或误解,希望有人可以帮助解决。
我正在运行 rxjava 2.1.5 并有以下代码片段:
public static void main(String[] args) {
final Observable<Object> observable = Observable.create(emitter -> {
// Code ...
});
observable.subscribeOn(Schedulers.io())
.retryWhen(error -> {
System.out.println("retryWhen");
return error.retry();
}).subscribe(next -> System.out.println("subscribeNext"),
error -> System.out.println("subscribeError"));
}
执行后,程序打印:
retryWhen
Process finished with exit code 0
我的问题,我不明白的是:为什么在订阅 Observable 后立即调用 retryWhen ?可观察到的东西什么也不做。
我想要的是在发射器上调用 onError 时调用 retryWhen 。我是否误解了 rx 的工作原理?
Thanks!
添加新片段:
public static void main(String[] args) throws InterruptedException {
final Observable<Object> observable = Observable.create(emitter -> {
emitter.onNext("next");
emitter.onComplete();
});
final CountDownLatch latch = new CountDownLatch(1);
observable.subscribeOn(Schedulers.io())
.doOnError(error -> System.out.println("doOnError: " + error.getMessage()))
.retryWhen(error -> {
System.out.println("retryWhen: " + error.toString());
return error.retry();
}).subscribe(next -> System.out.println("subscribeNext"),
error -> System.out.println("subscribeError"),
() -> latch.countDown());
latch.await();
}
发射器 onNext 和complete 被调用。 DoOnError 永远不会被调用。输出是:
重试时间:io.reactivex.subjects.SerializedSubject@35fb3008
订阅下一个
进程已完成,退出代码为 0