最近我意识到 RxFinally https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh212133(v=vs.103)操作符的行为方式至少对我来说是出乎意料的。我的期望是抛出的任何错误finallyAction
将传播到下游运营商的观察者。可惜事实并非如此。现实中运营商first将先行序列的完成(或失败)传播给其观察者,并且then调用action
,在不可能传播操作引发的潜在错误的时间点。所以它会抛出错误ThreadPool
,并使进程崩溃。这不仅出乎意料,而且存在很大问题。下面是此行为的最小演示:
Observable
.Timer(TimeSpan.FromMilliseconds(100))
.Finally(() => throw new ApplicationException("Oops!"))
.Subscribe(_ => { }, ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Completed"));
Thread.Sleep(1000);
结果:未处理的异常(Fiddle https://dotnetfiddle.net/o89QpE)
抛出的异常Finally
lambda 不由Subscribe
:onError
处理程序,因为这是可取的。
这个功能(我很想称其为缺陷)严重限制了该功能的实用性Finally
我眼中的运营商。本质上,我只能在我想要调用一个预计永远不会失败的操作时使用它,如果它失败,则表明应用程序状态发生灾难性损坏,并且无法恢复。我可以用它来举例Release
a SemaphoreSlim
(就像我所做的那样here https://stackoverflow.com/questions/64841312/how-to-merge-multiple-observables-with-order-preservation-and-maximum-concurrenc/64880836#64880836例如),只有当我的代码有错误时才会失败。在这种情况下我的应用程序崩溃是可以接受的。但我也用过recently https://stackoverflow.com/questions/69799279/is-it-possible-to-use-rx-using-operator-with-iasyncdisposable/69801283#69801283调用调用者提供的未知操作,该操作可能会失败,并且在这种情况下使应用程序崩溃是不可接受的。相反,错误应该传播到下游。所以我在这里问的是如何实现Finally
变体(我们称之为FinallySafe
)具有相同的签名,并且行为如下:
public static IObservable<TSource> FinallySafe<TSource>(
this IObservable<TSource> source, Action finallyAction);
- The
finallyAction
应该被调用after the source
序列已发出OnCompleted
or an OnError
通知,但是before该通知被传播给观察者。
- If the
finallyAction
调用成功完成,原来的OnCompleted
/OnError
通知应传播给观察者。
- If the
finallyAction
调用失败,OnError
通知应该传播给观察者,其中包含刚刚发生的错误。在这种情况下,前一个错误可能导致source
完成失败,应被忽略(不传播)。
- The
finallyAction
当FinallySafe
在完成之前取消订阅source
。当订阅者(观察者)处置订阅时,finallyAction
应该同步调用,并且任何错误都应该传播给调用者Dispose
method.
- If the
FinallySafe
被多个观察者订阅,finallyAction
应遵循上述规则,为每个订阅者独立地为每个订阅者调用一次。并发调用是可以的。
- The
finallyAction
每个订阅者不应被多次调用。
验证:替换Finally
与FinallySafe
在上面的代码片段中,应该会导致程序不会因未处理的异常而崩溃。
选择:我也愿意接受一个答案,它可以合理解释为什么内置函数的行为Finally
操作员的行为优于自定义的行为FinallySafe
运算符,如上所述。