为什么每个观察委托都在新线程上运行

2023-11-29

在 Rx 中,当使用 Scheduler.NewThread 作为 ObserveOn 方法时,当 Rx 已经保证 OnNext 永远不会重叠时,让每个观察委托 (OnNext) 在新线程上运行有什么好处。如果每个 OnNext 都会被一个接一个地调用,为什么每个都需要新的线程呢?

我理解为什么人们想要在与订阅和应用程序线程不同的线程上运行观察委托,但在新线程上运行每个观察委托,而它们永远不会并行运行?......对我或我来说没有意义我在这里缺少一些东西吗?

例如

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);

            var numbers = from number in Enumerable.Range(1,10) select Process(number);

            var observableNumbers = numbers.ToObservable()
                .ObserveOn(Scheduler.NewThread)
                .SubscribeOn(Scheduler.NewThread);

            observableNumbers.Subscribe(
                n => Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId));

            Console.ReadKey();
        }

        private static int Process(int number)
        {
            Thread.Sleep(500);
            Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);

            return number;
        }
    }
}

上面的代码产生以下结果。请注意,每次消费都是在一个新线程上完成的。

Application Thread : 8
Producing : 1    on Thread : 9
Consuming : 1    on Thread : 10
Producing : 2    on Thread : 9
Consuming : 2    on Thread : 11
Producing : 3    on Thread : 9
Consuming : 3    on Thread : 12
Producing : 4    on Thread : 9
Consuming : 4    on Thread : 13
Producing : 5    on Thread : 9
Consuming : 5    on Thread : 14
Producing : 6    on Thread : 9
Consuming : 6    on Thread : 15
Producing : 7    on Thread : 9
Consuming : 7    on Thread : 16
Producing : 8    on Thread : 9
Consuming : 8    on Thread : 17
Producing : 9    on Thread : 9
Consuming : 9    on Thread : 18
Producing : 10   on Thread : 9
Consuming : 10   on Thread : 19

NewThread 调度程序对于长时间运行的订阅者很有用。如果您未指定任何调度程序,则生产者将被阻止等待订阅者完成。通常,您可以使用 Scheduler.ThreadPool,但如果您希望有很多长时间运行的任务,您将不希望用它们阻塞您的线程池(因为它可能被多个可观察的订阅者使用) )。

例如,请考虑对您的示例进行以下修改。我将延迟移至订阅者,并添加了主线程何时准备好键盘输入的指示。请注意取消注释 NewThead 行时的差异。

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);

            var numbers = from number in Enumerable.Range(1, 10) select Process(number);

            var observableNumbers = numbers.ToObservable()
//              .ObserveOn(Scheduler.NewThread)
//              .SubscribeOn(Scheduler.NewThread)
            ;

            observableNumbers.Subscribe(
                n => {
                    Thread.Sleep(500);
                    Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId);
                });

            Console.WriteLine("Waiting for keyboard");
            Console.ReadKey();
        }

        private static int Process(int number)
        {
            Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);

            return number;
        }
    }
}

那么为什么 Rx 不优化为每个订阅者使用相同的线程呢?如果订阅者运行时间太长以至于您需要一个新线程,那么线程创建开销无论如何都是微不足道的。一个例外是,如果大多数订阅者都很短,但少数订阅者长时间运行,那么重用同一线程的优化确实会很有用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么每个观察委托都在新线程上运行 的相关文章

  • 简单的C问题

    作为我正在进行的项目的一部分 我必须开始学习 C 我已经开始解决其中的 欧拉 问题 并且遇到了麻烦 我必须找到 1000 以下的所有 3 或 5 的倍数之和 有人可以帮助我吗 谢谢 include
  • 数据结构的优化存储以实现快速查找和持久化

    Scenario 我有以下方法 public void AddItemSecurity int itemId int userIds public int GetValidItemIds int userId 最初我正在考虑表单上的存储 i
  • 在 C++ 中,当我将值传递给函数时,它是否总是转换为适当的类型?

    如果我有一个像这样的函数void func size t x 我称该函数为func 5 5 立即转换为size t类型 这通常适用于所有类型吗 我问这个问题是因为我发誓我见过人们编写代码 他们做类似的事情func 5 0 将 5 作为双精度
  • 如何将异常对象序列化为 xml 字符串

    我想要类似的东西 try code here catch Exception ex stringXML Exception toXML 这样 stringXML 的值就是
  • 函数的不明确的引用/值版本

    考虑以下函数原型 void Remove SomeContainer Vec const std size t Index SomeContainer Remove SomeContainer Vec const std size t In
  • Windows CE 6.0 和运行时链接到调试 DLL /MDd

    我在 x86 PC 上使用 Windows CE 6 0 R3 我已经为该平台构建了 NK bin 和 SDK 但我有一些问题需要了解如何使用 MTd 调试 DLL 构建控制台应用程序 如果我尝试构建这个 main c with MDd i
  • 如何在Qt中更快地读取数据?

    Qt读取数据库比C 慢吗 我想我错过了一些东西 为了比较阅读速度 我在 Qt 中编写了以下内容 QElapsedTimer t t start int count 0 QString cs Driver SQL Server Server
  • ASP.NET Core 中 AsNoTracking 的模拟或更好的解决方法

    您如何模拟 AsNoTracking 或者是否有更好的解决方法来解决此问题 Example public class MyContext MyContextBase Constructor public MyContext DbContex
  • 通过 EUSART PIC18F45K80 打印消息

    我正在尝试向 Docklight 发送串行消息 但始终收到空值 我正在使用带有 XC8 MPLAB X 的 PIC18F45K80 我的代码中的所有内容似乎都是正确的 但我想我错了 我该如何修复它 include
  • 使用 CMake 对 SDL 的未定义引用

    我正在使用 SDL v1 2 15 7 和 CMake 3 2 1 开发一个项目 在 h 文件中我添加了 include
  • memccpy 返回比 src 起始地址更低的内存地址

    我有一个学校项目 我必须重新编码memccpy 功能 我使用 2 个程序来检查我的代码是否正常工作 第一个是只有一个主程序的小程序 第二个程序是另一个学生开发的 可以找到here https github com yyang42 mouli
  • 为什么Windsor只能拦截虚方法或接口方法?

    我正在阅读文档 发现如果不使用接口 那么 Windsor 只能拦截虚拟方法 这是 Windsor 的限制还是 C 语言的限制 我正在寻找深入的答案 C 语言在这里完全无关 问题是拦截在运行时级别如何工作 一种技术是从类继承 实现接口并将其用
  • 使用 C# 和 .NET Core 在 AWS Cognito 用户池中进行用户管理

    如何使用 C 和 NET Core 3 x 管理 AWS Cognito 用户池中的用户 在文档中找不到有关它的任何内容 Attilio Gelosa 的原创文章 我写这篇文章是希望对其他人有帮助 我必须阅读一页又一页的文档 并从 AWS
  • 函数中的重复参数检查

    我经常有调用层次结构 因为所有方法都需要相同的参数 如果我不想将它们放在实例级别 类的成员 那么我总是问我在每个方法中检查它们的有效性是否有意义 例如 public void MethodA object o if null o throw
  • 如何明智地解释这个编译器警告?

    当我执行这段代码时question https stackoverflow com a 51056490 2411320 我收到这个警告 warning format d expects argument of type int but a
  • 更改成员资格、角色等的默认连接字符串

    默认情况下 我的网络应用程序似乎正在使用LocalSqlServer作为用于任何应用程序服务 例如成员资格 角色 身份验证 等 的连接字符串 有什么方法可以更改默认连接字符串应该是什么 默认值是 LocalSqlServer 似乎很随意 我
  • 使用 QTestLib 时抑制 qDebug

    我正在向 Qt 中的项目添加单元测试 并希望使用 QTestLib 我已经设置了测试并且它们运行良好 问题是在项目中我们重写了 qDebug 以输出到我们自己的日志文件 这在运行应用程序时效果很好 问题是当我测试类时 它有时会开始记录 然后
  • 如何在您的网站中连接两个人

    有一款名为 Verbosity 的游戏 这是一款有目的的游戏 位于此链接上www gwap com 在游戏中 他们随机连接两个玩家互相玩 游戏是玩家1应该向他的搭档 玩家2 描述一个单词 而玩家2应该猜测这个单词 我正在尝试建立一个网站来执
  • 频繁插入已排序的集合

    我已经对集合 列表 进行了排序 并且我需要始终保持其排序 我目前在我的集合上使用 List BinarySearch 然后在正确的位置插入元素 我也尝试过在每次插入后对列表进行排序 但性能不可接受 有没有一种解决方案可以提供更好的性能 也许
  • 清理 TPL 中的 CallContext

    根据我使用的是基于 async await 的代码还是基于 TPL 的代码 我在逻辑清理方面得到了两种不同的行为CallContext 我可以设置和清除逻辑CallContext如果我使用以下异步 等待代码 正如我所期望的 class Pr

随机推荐

  • Visual Studio 2015 中的向导中缺少 PostgreSQL 数据提供程序

    我花了一天时间尝试将 Entity Framework 6 SQL Server CE 迁移到 PostgreSQL 我已经很好地复制了数据库 但我似乎无法让数据提供程序正常工作 首先 我尝试了旧版 2 2 7 版本的 EF 提供程序 它不
  • com.google.android.maps.MapView ClassNotFoundException

    我正在尝试执行 google 地图活动 但收到 LogCat 错误并且我的应用程序崩溃了 我在注册 API 密钥时使用了 android 提供的编码 所以我不知道为什么它不起作用 我需要帮助解决这个问题 LogCat 08 10 11 04
  • 列表列表的唯一性

    我很好奇什么是唯一化此类数据对象的有效方法 testdata 9034968 ETH 14160113 ETH 9034968 ETH 11111 NOT 9555269 NOT 15724032 ETH 15481740 ETH 1548
  • 如何停止一个线程?

    当线程处于活动状态时 如何停止该线程 我已经给了喜欢 if thread isAlive thread stop 但方法 stop 已被弃用并引发异常 01 21 14 12 40 188 ERROR global 535 Deprecat
  • 是创建新的活动更好还是只是创建不同的布局并替换现有的布局?

    由于我是 Android 新手 我现在正在思考什么是正确的做事方式 目前 我正在编写的应用程序有 4 个不同的屏幕 屏幕 1 节点列表 主屏幕 屏幕 2 选项菜单 带按钮的表格布局 屏幕 3 导航 屏幕 4 有关版本等的文本详细信息 可以使
  • 在 Android 中模糊图像

    我正在使用以下代码来模糊 android 中的图像 但它不起作用 我得到的最终图像颜色非常扭曲 而不是我想要的模糊 我究竟做错了什么 public Bitmap blurBitmap Bitmap bmpOriginal int width
  • 如何从 Eval: in .dir-local.el 设置缓冲区局部变量?

    为什么这有效 nil compilation directory home vava code directory compilation command rake 这不是吗 nil Eval setq compilation direct
  • PHP Google Sheets API - 权限被拒绝

    您好 我刚刚开始学习 PHP 并且正在尝试使用 Google Sheets API 我以为我遵循了他们的指示 我可以很好地调用和编辑工作表数据 但只能公共访问 当我将访问状态更改为私人或组时 我遇到以下消息 PHP Fatal error
  • 使用 AspNet.Security.OpenIdConnect.Server 注销 (ASP.NET vNext)

    我正在使用 Visual Studio 2015 Enterprise 和 ASP NET vNext Beta8 来发行和使用 JWT 令牌 如下所述here 在我们的实现中 我们在令牌发布时将一些客户端详细信息存储在 Redis 中 并
  • 转换为 .NET 4 后,在 VS2010 中将字段附加到记录集时出现奇怪的错误

    我有这个网站的一些代码代码项目链接将数据表转换为记录集 这段代码一直工作正常 直到我更改为 NET 4 以前是 2 现在当我调用以下行时 Dim result As New ADODB Recordset result CursorLoca
  • 如何使用 Indy TIdTCPServer 跟踪客户端数量

    我想知道当前到 Indy 9 TIdTCPServer 的客户端连接数 在 Delphi 2007 上 我似乎找不到提供此功能的属性 我尝试在服务器 OnConnect OnDisconnect 事件上增加 减少计数器 但当客户端断开连接时
  • 我该如何处理代码以避免被杀?

    I got Killed运行一段代码后 代码的第一部分是 def load data distance file distance min dis max dis sys float info max 0 0 num 0 with open
  • MongoDB批量插入忽略重复

    我用谷歌搜索了一下 找不到任何关于如何在使用批量插入时忽略重复错误的可靠信息 这是我当前使用的代码 MongoClient connect mongoURL function err db if err console err err le
  • PHP 将月份数字转换为短月份名称[重复]

    这个问题在这里已经有答案了 我需要将月份编号转换为短月份名称 即 1 表示一月 2 表示二月 我知道我可以通过数组来实现这一点 但是还有其他方法吗 帮助表示赞赏 Thanks 就在这里 使用date stftime结合mktime创建所需月
  • yang 中默认值的条件赋值

    我的模型有两个属性 叶协议 叶端口 我想具体说明的是 如果协议 ssh 那么默认端口值为 22 如果协议 http 那么默认端口值为 80 etc 我该如何用 yang 来表达这个意思 没有条件的defaultYANG 值 你需要两个def
  • OpenQA.Selenium.WebDriverException:“无法在 http://localhost:60623/ 上启动驱动程序服务”

    以下代码在过去效果很好 几天后 我尝试再次运行它 但它抛出了这样的错误 using OpenQA Selenium using OpenQA Selenium Chrome using OpenQA Selenium Support UI
  • 如何移动自定义对话框?

    我是安卓初学者 我正在创建一个自定义对话框 它工作正常 但这个对话框没有动 如何移动这个自定义对话框 例如在Windows中包含记事本 画图等 当您单击此 记事本 画图等 标题栏时可以移动位置 如果可能 请发送如何移动对话框的信息 否则 如
  • window.print 不适用于 Opera 浏览器

    我正在尝试使用 javascript 代码在 Opera 浏览器中打开打印对话框 就好像我使用以下代码 Opera 浏览器可以理解并能够打开打印对话框
  • 如果元素存在等待它消失

    所以我正在尝试编写一些 cypress 代码 但我认为文档并不是很清楚 我有两种情况 页面加载时没有加载微调器 页面通过加载微调器加载 我想编写能够满足这两种情况的代码 并让测试继续进行 如果页面没有加载微调器元素 照常继续测试 如果页面确
  • 为什么每个观察委托都在新线程上运行

    在 Rx 中 当使用 Scheduler NewThread 作为 ObserveOn 方法时 当 Rx 已经保证 OnNext 永远不会重叠时 让每个观察委托 OnNext 在新线程上运行有什么好处 如果每个 OnNext 都会被一个接一