Rxjava重试时立即调用

2024-01-31

我对 rxjava 有一个非常具体的问题或误解,希望有人可以帮助解决。

我正在运行 rxjava 2.1.5 并有以下代码片段:

public static void main(String[] args) {

    final Observable<Object> observable = Observable.create(emitter -> {
        // Code ... 
    });

    observable.subscribeOn(Schedulers.io())
            .retryWhen(error -> {
                System.out.println("retryWhen");
                return error.retry();
            }).subscribe(next -> System.out.println("subscribeNext"),
                         error -> System.out.println("subscribeError"));

}

执行后,程序打印:

retryWhen

Process finished with exit code 0

我的问题,我不明白的是:为什么在订阅 Observable 后立即调用 retryWhen ?可观察到的东西什么也不做。

我想要的是在发射器上调用 onError 时调用 retryWhen 。我是否误解了 rx 的工作原理?

Thanks!

添加新片段:

public static void main(String[] args) throws InterruptedException {

    final Observable<Object> observable = Observable.create(emitter -> {
        emitter.onNext("next");
        emitter.onComplete();
    });

    final CountDownLatch latch = new CountDownLatch(1);
    observable.subscribeOn(Schedulers.io())
            .doOnError(error -> System.out.println("doOnError: " + error.getMessage()))
            .retryWhen(error -> {
                System.out.println("retryWhen: " + error.toString());
                return error.retry();
            }).subscribe(next -> System.out.println("subscribeNext"),
                         error -> System.out.println("subscribeError"),
                         () -> latch.countDown());

    latch.await();
}

发射器 onNext 和complete 被调用。 DoOnError 永远不会被调用。输出是:

重试时间:io.reactivex.subjects.SerializedSubject@35fb3008 订阅下一个

进程已完成,退出代码为 0


retryWhen当一个Observer订阅它,这样你就有了一个主序列,伴随着一个发出Throwable主序列失败了。你应该在上面编写一个逻辑Observable你进入这个Function所以最后,一Throwable将在另一端产生一个值。

Observable.error(new IOException())
    .retryWhen(e -> {
         System.out.println("Setting up retryWhen");
         int[] count = { 0 };
         return e
            .takeWhile(v -> ++count[0] < 3)
            .doOnNext(v -> { System.out.println("Retrying"); });
    })
    .subscribe(System.out::println, Throwable::printStackTrace);

自从e -> { }函数体是为每个单独的订阅者执行的,您可以安全地拥有每个订阅者的状态,例如重试计数器。

Using e -> e.retry()没有效果,因为输入错误流永远不会得到它的onError called.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rxjava重试时立即调用 的相关文章

随机推荐