我有一个具有频繁值的流和一个具有较慢值的流。我想将它们组合起来,但仅当较慢的发出时才发出一个值。所以combineLatest
不起作用。
就像这样:
a1
a2
b1
(a2,b1)
a3
a4
a5
b2
(a5,b2)
目前我正在这样做,有没有更干净的方法?
withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] =
Observable({ o =>
var last : A
fast.subscribe({a => last = a})
slow.subscribe({b => o.onNext((last,b))})
})
edit:该运算符现在处于 Rx 中,称为与最新来自 https://github.com/ReactiveX/RxJava/pull/2760.
您正在寻找的是一个我称之为“combinePrev”的组合器,它在 API 中不存在,但在许多情况下却非常必要。sample
运算符很接近,但它不会合并两个流。我还错过了 RxJS 中的“combinePrev” https://github.com/Reactive-Extensions/RxJS/issues/335。事实证明,“combinePrev”(“withLatest”)的实现很简单,只依赖于 map 和 switch:
withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = {
val hotSlow = slow.publish.refCount
fast.map({a => hotSlow.map({b => (a,b)})}).switch
}
这里有一个jsfiddle http://jsfiddle.net/staltz/2orcypoz/3/在 RxJS 中实现的相同运算符的示例。
虽然运算符不在 Rx 中,但您可以使用隐式类,这样您就可以使用slow.withLatest(fast)
:
implicit class RXwithLatest[B](slow: Observable[B]) {
def withLatest[A](fast : Observable[A]) : Observable[(A,B)] = /* see above */
}
Note: slow
必须是hot
. If slow
是一个冷的 Observable,withLatest
不起作用。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)