例如,
给定一定数量 (m) 的数字流 (m1, m2, m3, m4, m5, m6...),并对前 n 个项目应用变换 (2 * i)(n 可以小于、等于或大于 m),对其余项目应用另一个变换 (3 * i)。和
返回结果:m1*2、m2*2、m3*3、m4*3、m5*3、m6*3...(此处假设 n=2)。
我试图使用 take(n) 和skip(n),然后使用 concatwith,但看起来 take(n) 会删除序列中的剩余项目,并在之后使skip(n) 什么都不返回。
您可以分享您的 m 的直播,然后重新合并在一起take()
and skip()
流,像这样:
int m = 10;
int n = 8;
Observable<Integer> numbersStream = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.publish();
Observable<Integer> firstNItemsStream = numbersStream.take(n)
.map(i -> i * 2);
Observable<Integer> remainingItemsStream = numbersStream.skip(n)
.map(i -> i * 3);
Observable.merge(firstNItemsStream, remainingItemsStream)
.subscribe(integer -> System.out.println("result = " + integer));
numbersStream.connect();
EDIT:
正如@A.E. 所指出的。芫,share()
将开始与第一个订阅者一起发送,因此如果 Observable 已经开始发送项目,第二个订阅者可能会错过通知,因此在这种情况下还有其他可能性:
cache()
- 将回复所有缓存发出的项目并将其回复给每个新订阅者,但会牺牲取消订阅的能力,因此需要谨慎使用。
reply().refCount()
- 将创建Observable
that reply()
每个新订阅者的所有先前项目(类似于缓存),但当最后一个订阅者取消订阅时,将取消订阅。
在这两种情况下,都应考虑内存Observable
将在内存中缓存所有发出的项目。
publish()
- 在不缓存所有先前项目的情况下,另一种可能性是使用publish()
创造ConnectableObservable
,并称之为connect()
方法在所有所需订阅者订阅后开始发射,从而获得同步并且所有订阅者将正确收到所有通知。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)