我观察到一个奇怪的现象,有时会在我编写的 Rx 查询中发生,其中涉及CancellationToken
。两个回调注册到同一个CancellationToken
,一个在查询之外,一个是查询的一部分。的意图CancellationToken
是发出查询终止的信号。发生的情况是,有时第二个回调卡在执行中间,永远不会完成,从而阻止调用第一个回调。
下面是重现该问题的最小示例。虽然不是很小,但我不能再减少了。例如更换Switch
运算符与Merge
使问题消失。如果抛出异常,也会发生同样的情况Task.Delay(1000, cts.Token)
被吞掉了。
public class Program
{
public static void Main()
{
var cts = new CancellationTokenSource(500);
cts.Token.Register(() => Console.WriteLine("### Token Canceled! ###"));
try
{
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(1000))
.TakeUntil(Observable.Create<Unit>(observer =>
cts.Token.Register(() =>
{
Console.WriteLine("Before observer.OnNext");
observer.OnNext(Unit.Default);
Console.WriteLine("After observer.OnNext");
})))
.Select(_ =>
{
return Observable.StartAsync(async () =>
{
Console.WriteLine("Action starting");
await Task.Delay(1000, cts.Token);
return 1;
});
})
.Switch()
.Wait();
}
catch (Exception ex) { Console.WriteLine("Failed: {0}", ex.Message); }
Thread.Sleep(500);
Console.WriteLine("Finished");
}
}
预期输出:
Action starting
Before observer.OnNext
After observer.OnNext
### Token Canceled! ###
Failed: A task was canceled.
Finished
实际输出(有时):
Action starting
Before observer.OnNext
Failed: A task was canceled.
Finished
在小提琴上尝试一下。您可能需要运行该程序 3-4 次才能出现问题。请注意两个缺失的日志条目。看来是调用observer.OnNext(Unit.Default);
永远不会完成。
我的问题是:有谁知道导致这个问题的原因是什么?另外,我该如何修改CancellationToken
- 查询的相关部分,以便它执行其预期目的(终止查询),而不会干扰相同的其他注册回调CancellationToken
?
.NET 5.0.1 和 .NET Framework 4.8、System.Reactive 5.0.0、C# 9
Update:还有 .NET 6.0 和 System.Reactive 5.0.0 (截屏拍摄于 2022 年 6 月 4 日)
还有一项观察:如果我修改以下内容,问题就会停止出现Observable.Create
委托,以便它返回一个Disposable.Empty
代替CancellationTokenRegistration
, 像这样:
.TakeUntil(Observable.Create<Unit>(observer =>
{
cts.Token.Register(() =>
{
Console.WriteLine("Before observer.OnNext");
observer.OnNext(default);
Console.WriteLine("After observer.OnNext");
});
return Disposable.Empty;
}))
但我不认为忽略由返回的注册cts.Token.Register
是一个修复。