如何将取消令牌插入现有的IObservable
调用前的管道Publish
在其上(即,在它成为IConnectableObservable
)?
在订阅它之前,这必须是冷可观察管道的一部分(否则,我可以传递一个CancellationToken
令牌到IObservable
's Subscribe
, RunAsync
, ToTask
etc).
有推荐的模式吗?
我可以考虑使用TakeUntil https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh229530(v=vs.103)按照 Theodor Zoulias 的建议实现这一目标here https://stackoverflow.com/q/66630603/1768303。例如:
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
async Task Test(CancellationToken token)
{
var publishedSequence = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Do(n => Console.WriteLine($"Emitting: {n}"))
.Skip(3)
.TakeUntil(
Observable.Create<long>(
observer => token.Register(
(_, token) => observer.OnError(new OperationCanceledException(token)),
null)))
.Finally(() => Console.WriteLine($"Finally"))
.Publish();
using var subscription = publishedSequence.Subscribe(
onNext: n => Console.WriteLine($"OnNext: {n}"),
onError: e => Console.WriteLine($"OnError: {e}"),
onCompleted: () => Console.WriteLine("OnCompleted"));
using var connection = publishedSequence.Connect();
await publishedSequence.ToTask();
}
var cts = new CancellationTokenSource(1000);
await Test(cts.Token);
Ouput:
Emitting: 0
Emitting: 1
Emitting: 2
Emitting: 3
OnNext: 3
Emitting: 4
OnNext: 4
Emitting: 5
OnNext: 5
Emitting: 6
OnNext: 6
Emitting: 7
OnNext: 7
Emitting: 8
OnNext: 8
OnError: System.OperationCanceledException: The operation was canceled.
Finally
我还有一个自定义运算符的原型,WithCancellation
,这基本上是一个传递IObservable
还监听取消信号。不过,我宁愿坚持使用标准方法。
Updated,我想我已经找到了竞争条件TakeUntil
有效(可能是一个错误或只是我无法解释的行为),fiddle https://dotnetfiddle.net/bLYDgw。我无法用我的自制程序复制它WithCancellation
实现(在小提琴中注释掉)。
已更新,如果我使用,我也无法复制它.TakeUntil(Task.Delay(Timeout.Infinite, token).ToObservable())
.