根据您更新的要求(您想要重试失败的可观察量,而不是只想忽略它们),我们可以提出一个可行的解决方案。
首先,了解冷可观察值(在每个订阅上重新创建)和热可观察值(无论订阅如何都存在)之间的区别很重要。你不能Retry()
一个热门的可观察对象,因为它不知道如何重新创建底层事件。也就是说,如果一个热的可观察错误,它就会永远消失。
Subject
创建一个热可观察对象,从某种意义上说,您可以调用OnNext
没有订阅者,它将按预期运行。要将热可观察量转换为冷可观察量,您可以使用Observable.Defer
,其中将包含该可观察对象的“订阅创建”逻辑。
话虽如此,这里是修改后的原始代码:
var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}),
Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));
和测试(与之前类似):
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();
以及预期的输出:
1
2
-1
done
当然,您需要根据您底层可观察的内容来显着修改这个概念。使用受试者进行测试与实际使用它们不同。
我还想指出这个评论:
然而,似乎主题将重播所有以前的值
新的订阅(实际上是 Retry() ),将测试变成
无限循环。
事实并非如此——Subject
不这样做。您的代码的其他一些方面导致了无限循环,基于以下事实:Retry
重新创建订阅,并且订阅在某个时刻会产生错误。
原答案(用于完成)
问题是Retry()
不做你想做的事。从这里:
http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx
重复源可观察序列 retryCount 次或直到
它成功终止。
这意味着Retry
将不断尝试重新连接到底层可观察对象,直到成功并且不会抛出错误。
我的理解是你实际上希望可观察到的异常是ignored,未重试。这将做你想做的事:
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);
这使用Catch
捕获异常的可观察量,并在此时将其替换为空的可观察量。
这是使用主题的完整测试:
var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();
正如预期的那样,这会产生:
1
2
done