不同之处在于,对于来自下游的单个订阅,顶部变压器将订阅上游两次,从而复制通常不需要的上游的任何副作用:
Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
.doOnSubscribe(s -> System.out.println("Subscribed!"));
mixedSource.compose(f ->
Observable.merge(
f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
)
)
.subscribe(System.out::println);
将打印
Subscribed!
2
3
4
Subscribed!
A
B
C
这里代表的副作用是打印输出Subscribed!
根据真实源中的实际工作,这可能意味着发送电子邮件两次,检索表的行两次。通过这个特定示例,您可以看到,即使源值的类型交错,输出也会单独包含它们。
相比之下,publish(Function)
将为每个最终订阅者建立一个对源的订阅,因此源上的任何副作用只会发生一次。
mixedSource.publish(f ->
Observable.merge(
f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
)
)
.subscribe(System.out::println);
打印
Subscribed!
A
2
B
3
C
4
因为源被订阅一次并且每个项目被多播到该的两个“臂”.ofType().compose()
.