RxJava2 发布

2023-12-29

有什么区别

ObservableTransformer {
    Observable.merge(
        it.ofType(x).compose(transformerherex),
        it.ofType(y).compose(transformerherey)
    )
}

and

ObservableTransformer {
    it.publish{ shared ->
        Observable.merge(
            shared.ofType(x).compose(transformerherex),
            shared.ofType(y).compose(transformerherey)
        )
    }
}

当我使用这两个运行我的代码时,我得到了相同的结果。发布在这里做什么。


不同之处在于,对于来自下游的单个订阅,顶部变压器将订阅上游两次,从而复制通常不需要的上游的任何副作用:

Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
      .doOnSubscribe(s -> System.out.println("Subscribed!"));


mixedSource.compose(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

将打印

Subscribed!
2
3
4
Subscribed!
A
B
C

这里代表的副作用是打印输出Subscribed!根据真实源中的实际工作,这可能意味着发送电子邮件两次,检索表的行两次。通过这个特定示例,您可以看到,即使源值的类型交错,输出也会单独包含它们。

相比之下,publish(Function)将为每个最终订阅者建立一个对源的订阅,因此源上的任何副作用只会发生一次。

mixedSource.publish(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

打印

Subscribed!
A
2
B
3
C
4

因为源被订阅一次并且每个项目被多播到该的两个“臂”.ofType().compose().

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava2 发布 的相关文章