如何在发布之前将取消令牌插入 ReactiveX 流(IObservable)?

2024-04-15

如何将取消令牌插入现有的IObservable调用前的管道Publish在其上(即,在它成为IConnectableObservable)?

在订阅它之前,这必须是冷可观察管道的一部分(否则,我可以传递一个CancellationToken令牌到IObservable's Subscribe, RunAsync, ToTask etc).

有推荐的模式吗?

我可以考虑使用TakeUntil https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh229530(v=vs.103)按照 Theodor Zoulias 的建议实现这一目标here https://stackoverflow.com/q/66630603/1768303。例如:

using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;

async Task Test(CancellationToken token)
{
    var publishedSequence = Observable
        .Interval(TimeSpan.FromMilliseconds(100))
        .Do(n => Console.WriteLine($"Emitting: {n}"))
        .Skip(3)
        .TakeUntil(
            Observable.Create<long>(
                observer => token.Register(
                    (_, token) => observer.OnError(new OperationCanceledException(token)),
                    null)))
        .Finally(() => Console.WriteLine($"Finally"))
        .Publish();

    using var subscription = publishedSequence.Subscribe(
            onNext: n => Console.WriteLine($"OnNext: {n}"),
            onError: e => Console.WriteLine($"OnError: {e}"),
            onCompleted: () => Console.WriteLine("OnCompleted"));

    using var connection = publishedSequence.Connect();
    await publishedSequence.ToTask();
}

var cts = new CancellationTokenSource(1000);
await Test(cts.Token);

Ouput:

Emitting: 0
Emitting: 1
Emitting: 2
Emitting: 3
OnNext: 3
Emitting: 4
OnNext: 4
Emitting: 5
OnNext: 5
Emitting: 6
OnNext: 6
Emitting: 7
OnNext: 7
Emitting: 8
OnNext: 8
OnError: System.OperationCanceledException: The operation was canceled.
Finally

我还有一个自定义运算符的原型,WithCancellation,这基本上是一个传递IObservable还监听取消信号。不过,我宁愿坚持使用标准方法。

Updated,我想我已经找到了竞争条件TakeUntil有效(可能是一个错误或只是我无法解释的行为),fiddle https://dotnetfiddle.net/bLYDgw。我无法用我的自制程序复制它WithCancellation实现(在小提琴中注释掉)。

已更新,如果我使用,我也无法复制它.TakeUntil(Task.Delay(Timeout.Infinite, token).ToObservable()).


事实上,没有库方法可以做到这一点。我将从 CancellationToken 创建一个可观察对象,然后使用 TakeUntil 运算符。

public static IObservable<Unit> ToObservable(this CancellationToken cancellationToken) {
  // if the token can't be cancelled, use Never which will not complete
  if (!cancellationToken.CanBeCanceled) {
    return Observable.Never<Unit>();
  }

  // if the token is already cancelled, use Return which publishes
  // Unit and completes immediately.
  // This may save you from ObjectDisposedException later
  if (cancellationToken.IsCancellationRequested) {
    return Observable.Return<Unit>();
  }

  // use Create so that each .Subscribe is handled independently
  return Observable.Create<Unit>(observer => {

    // Observable.Create does not handle errors on its own
    try {
      // return the registration because Dispose will unregister it
      return cancellationToken.Register(() => {
        // When the token is cancelled, publish and complete
        observer.OnNext(Unit.Default);
        observer.OnCompleted();
      });
    }
    catch (ObjectDisposedException e) {
      // todo: consider handling this as if it were cancellation
      observer.OnError(e);
    }
  });
}

public static IObservable<T> TakeUntil<T>(this IObservable<T> source, CancellationToken cancellationToken)
  => source.TakeUntil(cancellationToken.ToObservable());
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在发布之前将取消令牌插入 ReactiveX 流(IObservable)? 的相关文章

随机推荐