RxJava:如何组合多个具有依赖关系的Observables并在最后收集所有结果?

2024-01-08

我正在学习 RxJava,作为我的第一个实验,尝试重写第一个中的代码run()中的方法这段代码 https://gist.github.com/benjchristensen/4671081(引用于Netflix 的博客 http://techblog.netflix.com/2013/02/rxjava-netflix-api.html作为 RxJava 可以帮助解决的问题),使用 RxJava 提高其异步性,即它不会等待第一个 Future 的结果(f1.get()),然后再继续执行其余代码。

f3依赖于取决于f1。我知道如何处理这个问题flatMap似乎可以解决这个问题:

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
        }
    });

Next, f4 and f5取决于f2。我有这个:

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer i) {
            Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
            Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
            return Observable.merge(f4Observable, f5Observable);
        }
    });

这开始变得奇怪(mergeing它们可能不是我想要的......)但允许我最后这样做,而不完全是我想要的:

f3Observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("Observed from f3: " + s);
        f4And5Observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println("Observed from f4 and f5: " + i);
            }
        });
    }
});

这给了我:

Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100

这是所有的数字,但不幸的是我在单独的调用中得到了结果,所以我不能完全替换原始代码中的最终 println :

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));

我不明白如何在同一行上访问这两个返回值。我想我可能缺少一些函数式编程的东西。我怎样才能做到这一点?谢谢。


看来您真正需要的只是更多关于 RX 如何使用的鼓励和观点。我建议您更多地阅读文档和弹珠图(我知道它们并不总是有用)。我还建议研究一下lift()函数和运算符。

  • 可观察的全部要点是将数据流和数据操作连接到单个对象中
  • 调用要点map, flatMap and filter是操纵数据流中的数据
  • 合并的重点是合并数据流
  • 运算符的目的是允许您中断稳定的可观察数据流并在数据流上定义您自己的操作。例如,我编写了一个移动平均运算符。总结起来就是这样n doubles 在 Observable of doubles 中返回移动平均线流。代码看起来像这样

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

您会松一口气,因为许多您认为理所当然的过滤方法都有lift()在引擎盖下。

照这样说;合并多个依赖项所需要做的就是:

  • 使用将所有传入数据更改为标准数据类型map or flatMap
  • 将标准数据类型合并到流中
  • 如果一个对象需要等待另一个对象,或者需要对流中的数据进行排序,请使用自定义运算符。注意:这种方法会减慢流速度
  • 用于列出或订阅来收集所有这些数据
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava:如何组合多个具有依赖关系的Observables并在最后收集所有结果? 的相关文章

随机推荐