C# 反应式扩展 当 OnNext 花费很长时间并且可观察到产生新事件时会发生什么

2024-03-17

我是 Rx 新手,我在想当 IObservable 非常快地产生大量事件而 OnNext 需要很长时间时会发生什么。我猜想新事件会在内部以某种方式排队,这样我就可以运行我们的内存。我对吗?考虑下面的小例子:

        Subject<int> subject = new Subject<int>();
        subject.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {      Console.WriteLine("Value published: {0}", x); Thread.Sleep(100); },
                     () => Console.WriteLine("Sequence Completed."));

        for (int i = 0; i < 10; i++)
        {
            subject.OnNext(i);
        }

我发布了 10 个事件,而消费者方法非常慢。所有事件都经过处理,因此必须缓存在内存中。如果我发布很多事件,我的内存就会耗尽,对吗?或者我错过了什么?

有没有办法限制反应式扩展中待处理事件的数量?例如,如果有超过 5 个待处理事件,我想忽略新事件。


是的,你是对的,慢的消费者会造成排队。最接近您所要求的内置运算符是Observable.Sample- 然而,这会丢弃较旧的事件,转而支持较新的事件。这是更常见的要求,因为它可以让缓慢的观察者赶上。

让我知道 Sample 是否足够,因为您描述的缓冲行为是一个不寻常的要求 - 它是可以实现的,但实现起来相当复杂,并且需要相当重要的代码。

编辑 - 如果您像这样使用示例,它将在每次 OnNext 之后返回最新事件(您需要提供一个调度程序才能使其工作 - 并且 NewThreadScheduler 创建一个线程每次订阅,不是每个事件:

void Main()
{
    var source = Observable.Interval(TimeSpan.FromMilliseconds(10)).Take(100);

    source.Sample(TimeSpan.Zero, NewThreadScheduler.Default)
        .Subscribe(SlowConsumer);

    Console.ReadLine();
}

private void SlowConsumer(long item)
{    
    Console.WriteLine(item + " " + Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(TimeSpan.FromSeconds(1));
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

C# 反应式扩展 当 OnNext 花费很长时间并且可观察到产生新事件时会发生什么 的相关文章

随机推荐

  • 如何根据 X509Certificate2Collection 链验证 X509Certificate2

    我正在编写一个 SAML 2 0 响应解析器来处理 ASP Net 中的 POST 身份验证 在 C 和 MVC 中 但这不太相关 所以我有一个 p7b要验证的文件并且可以读入X509Certificate2Collection以及示例断言
  • 像any.do一样弹出窗口

    我正在编写一个应用程序 在弹出窗口中显示未接来电和未读短信 它还具有提醒功能 关闭弹出窗口并在指定时间后打开它 它类似于any do的弹出窗口 我能够使用 WindowManger 创建这样一个窗口 但由于某些我到目前为止不明白的原因 弹出
  • 如何在 ClickOnce 安装中仅更新一个 DLL?

    我正在开发一个大型单击一次应用程序 150MB gt 200 个 DLL 作为交互式调试过程的一部分 我想仅更新 1 个 DLL 并重新启动应用程序 而无需重建和重新部署整个应用程序 那可能吗 如果是这样 怎么办 问题澄清 2009年9月2
  • ImageButton标题标签

    在 ImageButton 上使用 AlternateText 属性向浏览器呈现 alt 标签
  • SQL Server 触发器隔离/范围文档

    我一直在寻找确定的有关 SQL Server 中触发器的隔离级别 或并发性或范围 我不确定确切的称呼它 的文档 我发现以下来源表明我认为是正确的 也就是说 两个用户对同一个表 甚至是相同的行 执行更新 然后将执行独立且隔离的触发器 http
  • Thymeleaf 中 th:each 语句中的 If-Else

    我想要的是 Thymeleaf 中 th each 语句中的 if else If currentSkill null 然后显示包含内容的表格 否则 您没有任何技能 这是没有 if else 的代码 div table tr td td t
  • 递归函数 vs setInterval vs setTimeout javascript

    我正在使用 NodeJs 并需要调用无限函数 但我不知道什么是最佳性能 递归函数 function test my code test 设置时间间隔 setInterval function my code 60 设置超时时间 functi
  • 如何在 Android 中从网络浏览器 (Chrome) 打开任何应用程序?我与 A Href 链接有什么关系?

    我想从我的网络浏览器打开第三方应用程序 所以 我没有任何清单文件或任何东西 我有一个网页 我想要一个可以打开第三方应用程序的链接 例如 Twitter 或 Opera 如何构建锚链接来打开该应用程序 谢谢 基于意图的 URI 的基本语法如下
  • 如何在 PHP 中设置自定义标头

    JAVA 开发人员在标头中向我发送数据 我这样认为 SESSION HTTP COUNTRYNAME 如何使用标头返回响应 它尝试过header countryname USA 但是PHPfunction headers list没有显示它
  • mysql根据之前的记录增加值

    我有一张桌子 Id Parent Counter 1 A NULL 2 A NULL 3 A NULL 4 B NULL 5 B NULL 6 C NULL 7 D NULL 8 D NULL 我想更新表 使计数器列更新 1 与之前的一样长
  • 如何避免在具有许多实例变量的类中使用 getter/setter

    我会尽量保持简短 我的类有很多实例变量 30 因此有很多 getter setter 这些类本身很简单 但由于 getter setter LOC 爆炸了 而且还有太多的代码重复 所以我删除了属性并将它们存储在地图中 如下所示 public
  • 定期轮询 Go 中的 REST 端点

    我正在尝试编写一个 Go 应用程序 定期轮询 PHP 应用程序公开的 REST 端点 Go 轮询应用程序将有效负载读取到结构中并进行进一步处理 我正在寻找一些关于开始实施的建议 最简单的方法是使用 Ticker ticker time Ne
  • d3js 在饼图周围重新分布标签

    我正在使用 d3 js 创建一个外部带有标签的圆环图 我使用基于饼图每片质心的三角函数来定位标签 g append g attr class percentage append text attr transform function d
  • 持久 Akka 邮箱和无损

    在 Akka 中 当一个 actor 在处理消息时死亡 内部onReceive 该消息丢失 有没有办法保证无损 有没有办法配置 Akka 始终保留消息before将他们发送到onReceive 以便在演员死亡时可以恢复并重播 也许像持久邮箱
  • 我应该如何在 Angular 模块中包含模型类?

    我有几个类 我想成为一个普通的 bean DTO 类 它们不显示 component 类 它们不是 Pipe 类 也不应该是 Directive 至少我认为不应该 是 我希望能够将它们捆绑到一个模块中 它们将在其他模块中使用 但尽管有几个咒
  • scala string.split 不起作用

    以下是我的 REPL 输出 我不确定为什么 string split 在这里不起作用 val s Pedro groceries apple 1 42 s java lang String Pedro groceries apple 1 4
  • 根据用户输入更改 AppID 和 AppName

    我想在同一系统上多次安装同一应用程序 例如两个用户使用两个不同的 Web 服务 每个都有自己的 在我的设置脚本中我想更改AppID and AppName基于用户的输入 例如我的默认值AppName Service App 应该改为AppN
  • 如何使用Vault在Ansible v2中运行playbook api

    这是我所拥有的 我知道这无需加密即可运行 并且我可以运行 ansible vault 编辑 common yml with ANSIBLE VAULT PASSWORD FILE vault pass txt 在环境中设置 from col
  • Gerrit 可以在 Gitlab 之前工作吗?还是打算取代它

    我认为我并没有真正理解 Gerrit 如何融入我团队现有的工作流程 Gerrit 是否打算成为代码的中心 枢纽 我一直把它想象成类似于 Atlassian 的 Crucible 它适合 Atlassian 的 Stash 或 Bitbuck
  • C# 反应式扩展 当 OnNext 花费很长时间并且可观察到产生新事件时会发生什么

    我是 Rx 新手 我在想当 IObservable 非常快地产生大量事件而 OnNext 需要很长时间时会发生什么 我猜想新事件会在内部以某种方式排队 这样我就可以运行我们的内存 我对吗 考虑下面的小例子 Subject