非重放热可观察

2024-01-17

原问题

我有一个场景,我有多个IObservable我想要组合的序列Merge然后听。但是,如果其中之一产生错误,我不希望它使其他流的所有内容崩溃,也不希望重新订阅序列(这是一个“永远持久”的序列)。

我通过附加一个来做到这一点Retry()合并之前的流,即:

IEnumerable<IObservable<int>> observables = GetObservables();

observables
    .Select(o => o.Retry())
    .Merge()
    .Subscribe(/* Do subscription stuff */);

然而,当我想测试这个时,问题就出现了。我想测试的是,如果其中之一IObservables in observables产生一个OnError,其他人应该仍然能够发送他们的值,并且他们应该得到处理

我以为我只用两个Subject<int>s代表两个IObservables in observables;一个发送一个OnError(new Exception())另一个,在那之后,发送OnNext(1)。然而,似乎Subject<int>将重播新订阅的所有先前值(这实际上是Retry()是),将测试变成无限循环。

我尝试通过创建手册来解决这个问题IObservable这会在第一个订阅上产生错误,然后产生一个空序列,但感觉很奇怪:

var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
    i++;
    if (i < nErrors) {
        return Observable.Throw<int>(new Exception()).Subscribe(o);
    } else {
        return Observable.Empty<int>().Subscribe(o);
    }
});

我正在使用吗Subject或思考Retry()以错误的方式?对此还有其他想法吗?你会如何解决这种情况?

Update

好的,这是我想要的大理石图think Retry() does.

o = message, X = error.
------o---o---X
               \
     Retry() -> \---o---o---X
                             \
                   Retry() -> \...

我的问题可能更多是因为我没有一个好的库存类来使用前测试,因为Subject想要重播我以前的所有错误。

Update 2

这是一个测试用例,显示了我的意思Subject重演它的价值观。如果我说它在 a 中这样做,我使用这个术语是否正确?cold方式?我知道Subject是创建热可观察值的一种方式,但这种行为对我来说仍然感觉“冷”。

var onNext = false;
var subject = new Subject<int>();

subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);

Assert.That(onNext, Is.True);

根据您更新的要求(您想要重试失败的可观察量,而不是只想忽略它们),我们可以提出一个可行的解决方案。

首先,了解冷可观察值(在每个订阅上重新创建)和热可观察值(无论订阅如何都存在)之间的区别很重要。你不能Retry()一个热门的可观察对象,因为它不知道如何重新创建底层事件。也就是说,如果一个热的可观察错误,它就会永远消失。

Subject创建一个热可观察对象,从某种意义上说,您可以调用OnNext没有订阅者,它将按预期运行。要将热可观察量转换为冷可观察量,您可以使用Observable.Defer,其中将包含该可观察对象的“订阅创建”逻辑。

话虽如此,这里是修改后的原始代码:

var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}), 
                                               Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };                                            

observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));

和测试(与之前类似):

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();

以及预期的输出:

1
2
-1
done

当然,您需要根据您底层可观察的内容来显着修改这个概念。使用受试者进行测试与实际使用它们不同。

我还想指出这个评论:

然而,似乎主题将重播所有以前的值 新的订阅(实际上是 Retry() ),将测试变成 无限循环。

事实并非如此——Subject不这样做。您的代码的其他一些方面导致了无限循环,基于以下事实:Retry重新创建订阅,并且订阅在某个时刻会产生错误。


原答案(用于完成)

问题是Retry()不做你想做的事。从这里:

http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx

重复源可观察序列 retryCount 次或直到 它成功终止。

这意味着Retry将不断尝试重新连接到底层可观察对象,直到成功并且不会抛出错误。

我的理解是你实际上希望可观察到的异常是ignored,未重试。这将做你想做的事:

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);

这使用Catch捕获异常的可观察量,并在此时将其替换为空的可观察量。

这是使用主题的完整测试:

var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();

正如预期的那样,这会产生:

1
2
done
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

非重放热可观察 的相关文章

  • 获取两个字符串之间的公共部分c# [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我需要的是获取两个单词之间的共同部分并获取差异 例子 场景1 word1 感言 word2 Test 将返回 公共部分Test 不同之
  • ASP .NET MVC,创建类似路由配置的永久链接

    我需要帮助在 MVC 网站中创建类似 URL 路由的永久链接 Slug 已设置为 www xyz com profile slug 代码为 routes MapRoute name Profile url profile slug defa
  • Guid 应包含 32 位数字和 4 个破折号

    我有一个包含 createuserwizard 控件的网站 创建帐户后 验证电子邮件及其验证 URL 将发送到用户的电子邮件地址 但是 当我进行测试运行时 单击电子邮件中的 URL 时 会出现以下错误 Guid should contain
  • TextBox 焦点的 WinForms 事件?

    我想添加一个偶数TextBox当它有焦点时 我知道我可以用一个简单的方法来做到这一点textbox1 Focus并检查布尔值 但我不想那样做 我想这样做 this tGID Focus new System EventHandler thi
  • 为什么密码错误会导致“填充无效且无法删除”?

    我需要一些简单的字符串加密 所以我编写了以下代码 有很多 灵感 来自here http www codeproject com KB security DotNetCrypto aspx create and initialize a cr
  • 如何用 kevent() 替换 select() 以获得更高的性能?

    来自Kqueue 维基百科页面 http en wikipedia org wiki Kqueue Kqueue 在内核和用户空间之间提供高效的输入和输出事件管道 因此 可以修改事件过滤器以及接收待处理事件 同时每次主事件循环迭代仅使用对
  • 单元测试失败,异常代码为 c0000005

    我正在尝试使用本机单元测试项目在 Visual Studios 2012 中创建单元测试 这是我的测试 TEST METHOD CalculationsRoundTests int result Calculations Round 1 0
  • 组合框项目为空但数据源已满

    将列表绑定到组合框后 其 dataSource Count 为 5 但组合框项目计数为 0 怎么会这样 我习惯了 Web 编程 而且这是在 Windows 窗体中进行的 所以不行combo DataBind 方法存在 这里的问题是 我试图以
  • C# 创建数组的数组

    我正在尝试创建一个将使用重复数据的数组数组 如下所示 int list1 new int 4 1 2 3 4 int list2 new int 4 5 6 7 8 int list3 new int 4 1 3 2 1 int list4
  • std::bind 重载解析

    下面的代码工作正常 include
  • 在 C 中复制两个相邻字节的最快方法是什么?

    好吧 让我们从最明显的解决方案开始 memcpy Ptr const char a b 2 调用库函数的开销相当大 编译器有时不会优化它 我不会依赖编译器优化 但即使 GCC 很聪明 如果我将程序移植到带有垃圾编译器的更奇特的平台上 我也不
  • 通过等待任务或访问其 Exception 属性都没有观察到任务的异常

    这些是我的任务 我应该如何修改它们以防止出现此错误 我检查了其他类似的线程 但我正在使用等待并继续 那么这个错误是怎么发生的呢 通过等待任务或访问其 Exception 属性都没有观察到任务的异常 结果 未观察到的异常被终结器线程重新抛出
  • 从匿名类型获取值

    我有一个方法如下 public void MyMethod object obj implement 我这样称呼它 MyMethod new myparam waoww 那么我该如何实施MyMethod 获取 myparam 值 Edit
  • string.Compare 行为

    怎么会这样呢 这是从VS2008中的立即窗口获取的 string Compare 1 string Compare 0 0 1 从言论来看字符串比较 http msdn microsoft com en us library 84787k2
  • 内核开发和 C++ [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 从我know https stackoverflow com questions 580292 what languages are windo
  • .NET UI 元素线程限制的原因

    我们知道 除了实例化元素的线程之外 不可能从任何线程执行操作任何 UI 元素属性的代码 我的问题是 为什么 我记得当我们使用 COM 用户界面元素时 在 COM Visual Basic 6 0 时代 所有 UI 元素都是使用 COM 类和
  • 我应该在应用程序退出之前运行 Dispose 吗?

    我应该在应用程序退出之前运行 Dispose 吗 例如 我创建了许多对象 其中一些对象具有事件订阅 var myObject new MyClass myObject OnEvent OnEventHandle 例如 在我的工作中 我应该使
  • 为什么 Ajax.BeginForm 在 Chrome 中不起作用?

    我正在使用 c NET MVC2 并尝试创建一个 ajax 表单来调用删除数据库记录 RemoveRelation 的方法 删除记录的过程正在按预期进行 删除记录后 表单应调用一个 JavaScript 函数 从视觉效果中删除该记录 Rem
  • 在基类集合上调用派生方法

    我有一个名为 A 的抽象类 以及实现 A 的其他类 B C D E 我的派生类持有不同类型的值 我还有一个 A 对象的列表 abstract class A class B class A public int val get privat
  • WPF/数据集:如何通过 XAML 将相关表中的数据绑定到数据网格列中?

    我正在使用 WPF DataSet 连接到 SQL Server Express XAML 和 C Visual Studio 2013 Express 我从名为 BankNoteBook 的现有 SQL Server Express 数据

随机推荐