正如我原来的问题中所述(参见将相互依赖的事件流与 RX.Net 关联起来 https://stackoverflow.com/questions/26640548/correlate-interdependent-event-streams-with-rx-net/26647781)我有一个 RX.net 事件流,只要未触发某个其他事件,它就只能调用观察者的 OnNext 方法(基本上是“只要系统连接,就处理 Change-* 事件,在断开连接时暂停并重新启动”系统重新连接后处理 Change-* 事件)。
然而,虽然这对于新事件来说很顺利,但我如何取消/发出取消信号ongoing.OnNext() 调用?
由于您的观察者已经被写入接受CancellationToken
,我们只需修改您的 Rx 流即可提供事件数据。我们将使用 RxCancellationDisposable
每当流被取消订阅时我们都会处理掉它。
// Converts into a stream that supplies a `CancellationToken` that will be cancelled when the stream is unsubscribed
public static IObservable<Tuple<CancellationToken, T>> CancelOnUnsubscribe<T>(this IObservable<T> source)
{
return Observable.Using(
() => new CancellationDisposable(),
cts => source.Select(item => Tuple.Create(cts.Token, item)));
}
将其与另一个问题的解决方案放在一起:
DataSourceLoaded
.SelectMany(_ => DataSourceFieldChanged
.Throttle(x)
.CancelOnUnsubscribe()
.TakeUntil(DataSourceLoaded))
.Subscribe(c => handler(c.Item1, c.Item2));
当。。。的时候TakeUntil
子句被触发,它将取消订阅CancelOnUnsubscribe
observable,这将依次处理CancellationDisposable
并导致令牌被取消。您的观察者可以监视此令牌并在发生这种情况时停止其工作。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)