RxJava重试时出现奇怪的行为

2024-01-31

我正在玩 RxJava重试时间 http://reactivex.io/documentation/operators/retry.html操作员。网上关于它的资料很少,唯一值得一提的是this http://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained。这也不足以探索我想了解的各种用例。我还加入了异步执行和回退重试,以使其更加现实。

我的设置很简单:我有一堂课ChuckNorrisJokesRepository从 JSON 文件返回随机数量的 Chuck Norris 笑话。我的测试类是ChuckNorrisJokesService如下所示。我感兴趣的用例如下:

  1. 第一次尝试成功(无需重试)
  2. 重试 1 次后失败
  3. 尝试重试 3 次,但第二次成功,因此不会重试第三次
  4. 第三次重试成功

Note: 该项目可以在我的GitHub https://github.com/asarkar/java/blob/master/concurrency-learning/src/main/java/org/abhijitsarkar/rx/ChuckNorrisJokesService.java.

ChuckNorrisJokesService.java:

@Slf4j
@Builder
public class ChuckNorrisJokesService {
    @Getter
    private final AtomicReference<Jokes> jokes = new AtomicReference<>(new Jokes());

    private final Scheduler scheduler;
    private final ChuckNorrisJokesRepository jokesRepository;
    private final CountDownLatch latch;
    private final int numRetries;
    private final Map<String, List<String>> threads;

    public static class ChuckNorrisJokesServiceBuilder {
        public ChuckNorrisJokesService build() {
            if (scheduler == null) {
                scheduler = Schedulers.io();
            }

            if (jokesRepository == null) {
                jokesRepository = new ChuckNorrisJokesRepository();
            }

            if (threads == null) {
                threads = new ConcurrentHashMap<>();
            }

            requireNonNull(latch, "CountDownLatch must not be null.");

            return new ChuckNorrisJokesService(scheduler, jokesRepository, latch, numRetries, threads);
        }
    }

    public void setRandomJokes(int numJokes) {
        mergeThreadNames("getRandomJokes");

        Observable.fromCallable(() -> {
            log.debug("fromCallable - before call. Latch: {}.", latch.getCount());
            mergeThreadNames("fromCallable");
            latch.countDown();

            List<Joke> randomJokes = jokesRepository.getRandomJokes(numJokes);
            log.debug("fromCallable - after call. Latch: {}.", latch.getCount());

            return randomJokes;
        }).retryWhen(errors ->
                errors.zipWith(Observable.range(1, numRetries), (n, i) -> i).flatMap(retryCount -> {
                    log.debug("retryWhen. retryCount: {}.", retryCount);
                    mergeThreadNames("retryWhen");

                    return Observable.timer(retryCount, TimeUnit.SECONDS);
                }))
                .subscribeOn(scheduler)
                .subscribe(j -> {
                            log.debug("onNext. Latch: {}.", latch.getCount());
                            mergeThreadNames("onNext");

                            jokes.set(new Jokes("success", j));
                            latch.countDown();
                        },
                        ex -> {
                            log.error("onError. Latch: {}.", latch.getCount(), ex);
                            mergeThreadNames("onError");
                        },
                        () -> {
                            log.debug("onCompleted. Latch: {}.", latch.getCount());
                            mergeThreadNames("onCompleted");

                            latch.countDown();
                        }
                );
    }

    private void mergeThreadNames(String methodName) {
        threads.merge(methodName,
                new ArrayList<>(Arrays.asList(Thread.currentThread().getName())),
                (value, newValue) -> {
                    value.addAll(newValue);

                    return value;
                });
    }
}

为简洁起见,我将仅显示第一个用例的 Spock 测试用例。看我的GitHub https://github.com/asarkar/java/blob/master/concurrency-learning/src/test/groovy/org/abhijitsarkar/rx/ChuckNorrisJokesServiceSpec.groovy对于其他测试用例。

def "succeeds on 1st attempt"() {
    setup:
    CountDownLatch latch = new CountDownLatch(2)
    Map<String, List<String>> threads = Mock(Map)
    ChuckNorrisJokesService service = ChuckNorrisJokesService.builder()
            .latch(latch)
            .threads(threads)
            .build()

    when:
    service.setRandomJokes(3)
    latch.await(2, TimeUnit.SECONDS)

    Jokes jokes = service.jokes.get()

    then:
    jokes.status == 'success'
    jokes.count() == 3

    1 * threads.merge('getRandomJokes', *_)
    1 * threads.merge('fromCallable', *_)
    0 * threads.merge('retryWhen', *_)
    1 * threads.merge('onNext', *_)
    0 * threads.merge('onError', *_)
    1 * threads.merge('onCompleted', *_)
}

这失败了:

Too few invocations for:

1 * threads.merge('fromCallable', *_)   (0 invocations)
1 * threads.merge('onNext', *_)   (0 invocations)

我期待的是fromCallable被调用一次,就成功了,onNext被调用一次,然后是onCompleted。我缺少什么?

P.S.:完全披露 - 我也将这个问题发布在RxJava GitHub https://github.com/ReactiveX/RxJava/issues/4207.


经过几个小时的故障排除并在 ReactiveX 成员 David Karnok 的帮助下,我解决了这个问题。

retryWhen是一个复杂的,甚至可能是有缺陷的操作符。官方文档和至少一个答案在这里使用range运算符,如果没有重试,则立即完成。看我的讨论 https://github.com/ReactiveX/RxJava/issues/4207与大卫·卡诺克。

该代码可以在我的GitHub https://github.com/asarkar/java/blob/master/concurrency-learning/src/main/java/org/abhijitsarkar/rx/ChuckNorrisJokesService.java完成以下测试用例:

  1. 第一次尝试成功(无需重试)
  2. 重试 1 次后失败
  3. 尝试重试 3 次,但第二次成功,因此不会重试第三次
  4. 第三次重试成功
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava重试时出现奇怪的行为 的相关文章

随机推荐

  • PyQt5 QWebEngineView不显示网页

    The part where webpage should be rendered gets white for a fraction of second and then gets empty 这是我的代码 基本上是https www p
  • 构建两个独立数据库集成的最佳方法?

    我在工作中遇到了以下问题 我没有经验或知识来回答这些问题 我希望你们中的一些明智的人能够为我指明正确的方向 任何答案将不胜感激 Scenario 实施立面图案 http en wikipedia org wiki Facade patter
  • 捕获 async void 方法抛出的异常

    使用 Microsoft for NET 的异步 CTP 是否可以在调用方法中捕获异步方法抛出的异常 public async void Foo var x await DoSomethingAsync Handle the result
  • 使用 Python 或其他方法从 PDF 中提取指向另一个 PDF 中页面的链接

    我有 5 个 PDF 文件 每个文件都有指向另一个 PDF 文件中不同页面的链接 这些文件都是大型 PDF 的目录 每个大约 1000 页 使得手动提取成为可能 但非常痛苦 到目前为止 我已尝试在 Acrobat Pro 中打开该文件 我可
  • 如何使用 xsl 1.0 查找最小值和最大值?

    文件 1 xml
  • 如何找到货币的 html 代码?

    美元 的html代码是 36 我如何找到其他货币的 html 代码 多谢 这里有一些 测试 Web 浏览器中的 Unicode 支持 货币符号 http www alanwood net unicode currency symbols h
  • Android 在主屏幕上创建快捷方式

    我想做的是 1 我在一个活动中 有 2 个按钮 如果我单击第一个 则会在主屏幕中创建快捷方式 快捷方式打开一个html页面之前已经下载过 所以我希望它使用默认浏览器 但我不想使用互联网 因为我已经有了该页面 2 第二个按钮创建另一个启动活动
  • 如何刷新 HTML bag 上的 Canvas?

    我有一个 javascript 程序 可以在屏幕上绘制一百个圆圈 它们可以在画布上自行弹跳 目前 我在它们上面画了一个空矩形以擦除它们的下一代 但是有没有更好的方法来擦除和刷新 HTML 页面上的 Canvas Code function
  • Webpack Karma Istanbul 重新映射 TypeScript

    我正在开发一个客户端应用程序 但在创建正确的 Karma 配置时遇到问题 现在 我的设置如下 Webpack 使用 ts loader 编译 TypeScript 资产等 Karma 使用 webpack 插件 加载 Webpack 配置
  • 使用 javascript 将换行符替换为空格

    我想看看是否可以阻止回车键并将其替换为空格 我还使用表单验证仅允许字母 数字和其他一些特定字符 例如美元符号 减号和句点等 这是该代码 我想看看是否可以将它们合并为一个 并能够检查验证并将按键替换为一个代码 调用中的空格
  • Weka 高斯过程算法中的错误:乘法仅适用于双精度数

    我有这个数据集 我想通过请求 API 将 weka 算法应用于它 RELATION dataset ATTRIBUTE timestamp DATE yyyy MM dd HH mm ss z ATTRIBUTE action scale
  • 在后退按钮上单击执行功能。

    On document ready我有一个函数 它收集具有 wordNum 属性的 html 控件 我发出一个 AJAX 请求 该请求返回每个控件的一些描述 然后我设置这些控件及其innerhtml带有返回描述的属性 问题是 如果用户单击后
  • 如何修复 python 请求中的 错误?

    我正在使用一个 API 它接收 pdf 文件并进行一些分析 但我总是收到 Response 500 最初使用 Postman 进行测试 请求通过 收到带有相应 JSON 信息的响应 200 应关闭 SSL 安全性 但是 当我尝试通过 Pyt
  • UIActivityViewController 在 iOS 7 上不显示 FB 和 Twitter

    我正在尝试使用 UIActivityViewController 分享一些项目 在 iOS 6 上它运行良好 但是当我在 iOS 7 上测试它时 只显示邮件图标 由于怀疑我的 SDK 太旧 我下载了最新的 SDK 但它的行为仍然相同 我在模
  • 在 Android Studio 中更改矢量资源的填充颜色

    Android Studio 现在支持 21 上的矢量资源 并将在编译时为较低版本生成 png 我有一个矢量资源 来自材质图标 我想更改填充颜色 这适用于 21 但生成的 png 不会改变颜色 有没有办法做到这一点
  • 从 VB6 向 MS Access 插入订单

    我有一个用于培训管理的旧 VB 应用程序 它是用 VB6 编写的 数据库是 MsAccess 当我使用该应用程序时 在保存培训课程时 所有记录都保存在以前的记录之间 不按顺序 它没有添加到最后一行 该应用程序还从数据库中获取数据并将其显示在
  • 如何从日期选择器设置区域设置格式?

    如何根据用户的区域设置在 jQuery UI 的日期选择器中设置日期格式 我得到源html
  • 尝试从 Azure 检索数据时出现 MobileServiceInvalidOperationException

    这是我正在使用的方法 try List
  • 在laravel中安装vue 3.0

    有没有办法将 vue 3 0 安装到 Laravel 8 当我跑步时 npm install vue next 它开始安装Vue 3 0 但由于某种原因它也开始安装vue template compilerv2 6 12 出现以下内容 Ad
  • RxJava重试时出现奇怪的行为

    我正在玩 RxJava重试时间 http reactivex io documentation operators retry html操作员 网上关于它的资料很少 唯一值得一提的是this http blog danlew net 201