Rx学习之面向 .NET 和 C# 开发人员的响应式编程 - IEnumerable、IQueryable、IObservable 和 IQbservable 简介

2023-05-16

前言

响应式扩展已经存在了一段时间,在这篇文章中,我们应该更详细地讨论反应式扩展。此外,在这篇文章中,我们将触及IQbservables——世界上最神秘的名字/界面,可能是在希格斯玻色子之后。推送和拉取序列无处不在 - 现在设备在一端,云在另一端,大多数数据交易都是通过推/拉序列进行的。因此,必须掌握有关其周围编程模型的基本概念

第一件事

让我们退后一步,先讨论IEnumerable和IQueryable,然后再进一步讨论Reactive IObservable和IQbservable(Qbservables = Queryable Observables - 哦,是的,有趣的名字)。

IEnumerable

您可能知道,IEnumerable 模型可以被视为拉取操作。你将获得一个枚举器,然后通过使用 MoveNext 对一组项向前移动来迭代集合,直到到达最终项。当环境从外部源请求数据时,拉取模型非常有用。为了涵盖一些基础知识 - IEnumerable 有一个 GetEnumerator 方法,该方法返回一个带有 MoveNext() 方法和 Current 属性的枚举器。脱机提示 - C# foreach 语句可以迭代任何可以返回 GetEnumerator 的愚蠢事物。无论如何,这是IEnumerable的非通用版本的样子:

public interface IEnumerable
{
    IEnumerator GetEnumerator();
}

public interface IEnumerator
{
    Object Current {get;}
    bool MoveNext();
    void Reset();
}

现在,LINQ 在 IEnumerable 的泛型版本(即 IEnumerable)之上定义了一组运算符作为扩展方法,因此,通过利用泛型方法的类型推断支持,可以在任何 IEnumerable 上调用这些方法,而无需指定类型。 也就是说,你可以说someStringArray.Count()而不是someStringArray.Count()。可以浏览 Enumerable 类来查找这些静态扩展。

现在,LINQ 在 IEnumerable 的泛型版本(即 IEnumerable)之上定义了一组运算符作为扩展方法。 因此,通过利用泛型方法的类型推断支持,可以在任何 IEnumerable 上调用这些方法,而无需指定类型。 也就是说,你可以说someStringArray.Count()而不是someStringArray.Count().可以浏览 Enumerable 类来查找这些静态扩展。

在这种情况下,具有相关表达式的实际查询运算符(如 Where、Count 等)将编译为 IL,并且它们在与 CLR 执行的任何 IL 代码非常相似的进程中运行。从实现的角度来看,像 Where 这样的 LINQ 子句的参数是一个 lambda 表达式(你可能已经知道,from… select 只是语法糖,扩展到 IEnumerable 的扩展方法),在大多数情况下,像 Func<T,…> 这样的委托可以从内存中的角度表示表达式。但是,如果您想对位于其他位置的项目应用查询运算符怎么办?例如,如何在存储在可能位于云中的数据库中的表中的一组数据行上应用 LINQ 运算符,而不是在 IEnumerable 的内存中集合之上应用?这正是IQueryable的用途。

IQueryable

IQueryable 是一个 IEnumerable(它继承 自 IEnumerable),它指向一个可以在远程环境中执行的查询表达式。用于查询 IQueryable 类型的对象的 LINQ 运算符在 Queryable 类中定义,当您将它们应用于 IQueryable(即 System.Linq.Expressions.Expression)时,它们将返回 Expression<Func<T…>>(您可以在此处阅读有关表达式树的信息)。这将通过查询提供程序转换为远程世界(例如SQL系统)。因此,从本质上讲,IQueryable 具体实现指向查询表达式和查询提供程序 - 查询提供程序的工作是将查询表达式转换为执行它的远程世界的查询语言。从实现的角度来看,传递给 LINQ 的应用于 IQueryable 的参数将分配给表达式<T,…>。.NET 中的表达式树提供了一种将代码表示为数据或一种抽象语法树的方法。稍后,查询提供程序将演练此操作以在远程世界中构造等效查询。

public interface IQueryable : IEnumerable {       
    Type ElementType { get; }
    Expression Expression { get; }
    IQueryProvider Provider { get; }
}
public interface IQueryable<T> : IEnumerable<T>, IQueryable, IEnumerable {
   ..
} 

例如,在 LINQ to Entity Framework 或 LINQ to SQL,查询提供程序会将表达式转换为 SQL 并将其移交给数据库服务器。您甚至可以通过查看它们来查看目标查询语言 (SQL) 的翻译,或者简而言之,您在 IQueryable 上应用的 LINQ 查询运算符将用于构建表达式树,查询提供程序将翻译该运算符以在远程环境中构建和执行查询。如果您不清楚如何使用 Lambdas 中的表达式构建表达式树,请阅读本文。

反应式扩展

所以,现在让我们进入可观察量的解剖学和哲学。

IObservable

正如我们所讨论的,IEnumerable 类型的对象是拉取序列。但是,在现实世界中,有时我们也会推动事物 - 而不仅仅是拉动。(健康警报 - 当您同时执行这两项操作时,请确保安全执行)。在很多情况下,推送模式很有意义——例如,你不是和邻居在当地邮局门口日夜无限地排队等待收集蜗牛邮件,邮局代理会在邮件到达时将邮件推送到你家。

现在,关于推拉序列的一个很酷的事情是,它们是双的。这也意味着,IObservable 是 IEnumerable 的对偶 – 请参阅下面的代码。因此,为了简短起见,使用分类对偶性派生的IEnumerable的双接口是IObservable的。这个故事就像埃里克团队中的一些成员(他当时在微软工作)在发现这种二元性时,有一个当之无愧的时间狂妄自大过度活跃的高峰。如果你更感兴趣,这里有一篇来自埃里克的漂亮论文——埃里克论文的简要摘要如下。

//Generic version of IEnumerable, ignoring the non generic IEnumerable base
interface IEnumerable<out T>
{
    IEnumerator<T> GetEnumerator();
}

interface IEnumerator<out T>: IDisposable
{
    bool MoveNext(); // throws Exception
    T Current { get; } 
}

//Its dual IObservable
interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

interface IObserver<in T>
{
    void OnCompleted(bool done);
    void OnError(Exception exception);
    T OnNext { set; } 
}

令人惊讶的是,IObservable 实现看起来像观察者模式。

现在,LINQ 运算符很酷。它们非常富有表现力,并提供查询事物的抽象。因此,响应式团队中的疯狂家伙认为他们应该使用 LINQ 来对抗事件流。事件流实际上是推送序列,而不是拉取序列。因此,他们构建了IObservable。IObservable 结构允许您在推送序列(如事件流)之上编写 LINQ 运算符,这与查询 IEnumerable 的方式大致相同。IObservable 类型的对象的 LINQ 运算符在 Observable 类中定义。那么,您将如何在观察器上实现 LINQ 运算符(如 where)以执行一些筛选?下面是筛选器运算符 Where 的简单示例,用于 IEnumerable 和 IObservable(简化以进行比较)。在 IEnumerable 的情况下,当我们完成遍历时,您将释放枚举器。

//Where for IEnumerable

static IEnumerable<T> Where<T>(IEnumerable<T> source, Func<T, bool> predicate)
{
    // foreach(var element in source)
    //   if (predicate(element))
    //        yield return element;

    using (var enumerator = source.GetEnumerator())
    {
        while (enumerator.MoveNext())
        {
            var value= enumerator.Current;
            if (predicate(value))
            {
                yield return value;
            }
        }
    }
}

//Where for IObservable

static  IObservable<T> Where<T>(this IObserver<T> source, Func<T, bool> predicate)
{
   return Observable.Create<T>(observer =>
   {
       return source.Subscribe(Observer.Create<T>(value =>
           {
               try
               {
                   if (predicate(value)) observer.OnNext(value);
               }
               catch (Exception e)
               {
                   observer.OnError(e);
               }
           }));
   });
}

现在,看看IObservable的Where Implementation。在这种情况下,我们将 IDisposable 句柄返回到可观察量,以便我们可以释放它以停止订阅。对于过滤,我们只是创建一个内部可观察量,我们订阅源代码以在该内部应用我们的过滤逻辑 - 然后创建另一个订阅我们创建的内部可观察量的顶级可观察量。现在,您可以拥有包装事件源的 IObservable 的任何具体实现,然后您可以使用 Where!!凉。反应式扩展中的 Observable 类有一些帮助程序方法来从事件创建可观察量,如 FromEvent。让我们创建一个可观察量,并立即查询事件。幸运的是,Rx 团队已经拥有了可观察量和相关查询运算符的整个实现,因此我们最终不会像这样编写客户查询运算符。

您可以为安装包 Rx-Main 执行 nuget 以安装 Rx,并尝试此示例,该示例显示事件筛选。

//Let us print all ticks between 5 seconds and 20 seconds

//Interval in milli seconds
var timer = new Timer() { Interval = 1000 };
timer.Start();

//Create our event stream which is an Observable
var eventStream = Observable.FromEventPattern<ElapsedEventArgs>(timer, "Elapsed");
var nowTime = DateTime.Now;

//Same as eventStream.Where(item => ...);
var filteredEvents = from e in eventStream
                     let time = e.EventArgs.SignalTime
                     where
                         time > nowTime.AddSeconds(5) &&
                         time < nowTime.AddSeconds(20)
                     select e;
//Subscribe to our observable
filteredEvents.Subscribe(t => Console.WriteLine(DateTime.Now));
Console.WriteLine("Let us wait..");

//Dispose filteredEvents explicitly if you want
Console.ReadKey();

当然,在上面的例子中,我们可以使用 Observable.Timer – 但我只是想展示如何使用可观察量包装外部事件源。同样,您可以包装鼠标事件或 WPF 事件。您可以在此处探索有关 Rx 和可观察量以及一些应用程序的更多信息。现在让我们继续讨论IQbservables。

IQbservable

现在,让我们专注于IQbservable。IQbservable 是 IObserver 的对应物,用于将推送序列/事件源上的查询表示为表达式,就像 IQueryable 是 IEnumerable 的对应物一样。那么,这到底意味着什么呢?如果您检查IQbservable,您可以看到:

public interface IQbservable<out T> : IQbservable, IObservable<T>
{
}

public interface IQbservable
{
    Type ElementType { get; }
    Expression Expression { get; }
    IQbservableProvider Provider { get; }
} 

您可以看到它有一个表达式属性来表示 LINQ to Observable 查询,就像 IQueryable 有一个表达式来表示 LINQ 查询的 AST 一样。IQbservableProvider 负责将表达式转换为远程事件源(可能是云中的流服务器)的语言。

交互式扩展

交互式扩展的核心是针对 IEnumerable 的许多新扩展方法,即它向对象查询运算符添加了许多实用程序 LINQ。您可能在帮助程序或实用程序类中的某个地方手动编码了其中一些实用程序扩展方法,但现在 Rx 团队将其中的许多方法聚合在一起。此外,这篇文章假设你熟悉 C# 中的冷 IEnumerable 模型和迭代器。基本上,C# 编译器的作用是,它采用一个 yield return 语句,并为每个迭代器生成一个类。因此,在某种程度上,每个 C# 迭代器在内部保存一个状态机。您可以使用反射器或其他东西在返回IEnumerator的方法上检查这一点。或者更好的是,我的朋友Abhishek Sur在这里有一篇很酷的帖子,或者这篇关于在C#中实现迭代器的文章。

有关交互式扩展的更多信息
启动 C# 控制台应用程序,并使用安装包 Ix-Main 安装交互式扩展包。您可以在 System.Interactive 中探索 System.Linq.EnumerationsEx 命名空间.dll - 现在,让我们探索一些添加到 IEnumerable 的有用扩展方法。

检查交互式扩展中的几个实用程序方法

让我们快速检查一些有用的实用程序方法。

DO

最简单的Do版本非常有趣。当我们利用迭代器进行枚举时,它会懒惰地对序列中的每个元素调用一个操作。

//Let us create a set of numbers
var numbers = new int[] { 30, 40, 20, 40 };
var result=numbers.Do(n=>Console.WriteLine(n));

Console.WriteLine("Before Enumeration");

foreach(var item in result)
{
    //The action will be invoked when we actually enumerate
}
Console.WriteLine("After Enumeration");

结果如下。请注意,当我们枚举时,将应用该操作(在本例中为用于打印值的 Console.WriteLine)。

现在,最简单的 Do 方法版本的实现是这样的:如果你在 CodePlex 中快速浏览一下交互式扩展源代码,你可以看到我们的 Do 方法实际上是如何实现的。这是一个缩短的版本:

public static class StolenLinqExtensions
{
    public static IEnumerable<TSource> StolenDo<TSource>(
            this IEnumerable<TSource> source, Action<TSource> onNext)
    {
        //Get the enumerator
        using (var e = source.GetEnumerator())
        {
            while (true)
            {
                //Move next
                if (!e.MoveNext())
                    break;
                var current = e.Current;

                //Call our action on top of the current item
                onNext(current);

                //Yield return
                yield return current;
            }
        }
    }
}
DoWhile

DoWhile 在 Rx 中很有趣。它通过重复源序列直到给定条件为真来生成可枚举序列。

IEnumerable<TResult> DoWhile<TResult>(IEnumerable<TResult> source, Func<bool> condition)

正如预期的那样,您将看到 foreach 循环反复枚举结果,直到我们满足 DateTime.Now < then 条件 – 即,直到我们达到 10 秒。

Scan

Scan 将采取一个序列,以应用累加器函数来生成累积值序列。例如,让我们创建一个简单的求和累加器,它将采用一组数字来累积每个数字与前一个数字的总和:

var numbers = new int[] { 10, 20, 30, 40 };
 
//0 is just the starting seed value
var results = numbers.Scan(0,(sum, num) => sum+num);

//Print Results. Results will contain 10, 30, 60, 100

//0+10=10
//10+20 = 30
//30 + 30 = 60
//60 + 40 = 100

您可以从 CodePlex 中的 Rx 存储库中查看实际的 Scan 实现。这是一个缩写版本:

IEnumerable<TAccumulate> StolenScan<TSource, TAccumulate>
   (this IEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, 
                       TSource, TAccumulate> accumulator)
{
    var acc = seed;
    foreach (var item in source)
    {
        acc = accumulator(acc, item);
        yield return acc;
    }
}
结论

我们只是触及了冰山一角,因为这篇文章的目的是向您介绍 Ix 和 Rx。巴特·德·斯梅特(Bart De Smet)在这里有一个非常令人兴奋的演讲,您不应该错过。Ix因其功能根源而特别有趣。查看 CodePlex 中的反应式扩展存储库以获取更多灵感,这应该会让您对一些功能模式有更多想法。您也可以使用 Ix 提供程序和 Ix 异步包。

让我冒昧地嵌入查尔斯创作的图画,这是巴特在白板上绘制的抽象图画的具体表现。这是这篇文章的摘要。

参考

https://www.codeproject.com/articles/646361/reactive-programming-for-net-and-csharp-developers

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rx学习之面向 .NET 和 C# 开发人员的响应式编程 - IEnumerable、IQueryable、IObservable 和 IQbservable 简介 的相关文章

  • 串级PID控制四轴飞行器状态-实现

    参考网页 xff1a http blog csdn net nemol1990 article details 45131603 一 上下运动 向上运动 xff0c 四个轴的速度同时增加向下运行 xff0c 四个轴的速度同时减小 二 前后运
  • c和c++调用Python

    参考网页 xff1a https docs python org 3 6 c api index html http blog csdn net taiyang1987912 article details 44779719 http bl
  • yoloV3 darknet GPU手把手从编译到训练再到C++调用API

    我要先声明一下 xff0c 系统是Ubuntu18 04 xff0c 我的机器已经装好了opencv4 1 1 和 cuda10 0 的 xff0c 过程可参考我另一篇博客 xff0c 这部分不再说明 IDE使用qtcreator 1 下载
  • 使用Git命令创建新分支

    1 进入Git Bash Here xff0c 查看仓库所有分支git branch a 2 使用命令 git checkout master xff0c 表示切换到master分支 xff0c 并git pull拉取最新代码 3 由于的所
  • VINS-初探(一)

    vins是香港科技大学开源的一个单目相机结合IMU的一个VIO xff0c 在github上可以下载源码 xff0c 分为iOS系统下的和ros系统下的两种 xff0c ros下的为 span class hljs label https
  • ROS使用官方包进行串口通信

    https www jianshu com p c30f390427e7 参考http span class hljs comment www roswiki com read php tid 61 557 amp fid 61 39 sp
  • STM32F446ZET6关于使用PA11,PA12,PB14,PB15的使用注意事项

    STM32F446ZET6关于使用PA11 xff0c PA12 xff0c PB14 xff0c PB15的使用注意事项 这两天在用PB14做输入时 xff0c 发现总是不灵敏 xff0c 开始还以为布线有问题 xff0c 检查后发现不是
  • numpy基本方法总结

    NumPy基本方法 一 数组方法 创建数组 xff1a arange 创建一维数组 xff1b array 创建一维或多维数组 xff0c 其参数是类似于数组的对象 xff0c 如列表等 读取数组元素 xff1a 如a 0 a 0 0 数组
  • 如何实现Qt上位机软件串口的按字节数据处理?

    1 Qt串口类的选择 最近在做一个小项目 xff0c 需要用到Qt做上位机软件 xff0c 本人也是边学边做 xff0c 买了本 QtCreator快速入门 看了看就动手了 xff0c 由于初学不是很了解Qt5的官方资源 xff0c 一开始
  • 如何使用HAL库手动修改OSC引脚为PD0/1?

    CubeMX不能直接重映射OSC引脚为PD0 1 xff0c 那么 xff0c 如何使用HAL库手动修改OSC引脚为PD0 1 如下图所示配置即可 xff1a
  • “野火FreeRTOS教程”第7章补充知识点-异常流程

    一 知识点 1 Cortex M3 4在复位后CONTROL寄存器初始值为0 xff0c 也就是说MCU会处于线程模式 具有特权访问权限且使用主栈指针 MSP 2 当进入异常时CM3会自动入栈 xff0c 如下图所示 xff1a 3 当异常
  • mini四旋翼飞行器DIY日志

    一 方案 功能描述 xff1a 具备mini四旋翼飞行器的基本功能 xff0c 可以拓展其他模块实现定高 对航向角yaw的校准 xff0c 将所有io引出并设计出最小系统板子功能 xff0c 将设计I2C总线挂载选择电路便于调试和使用 可以
  • Quartus II 13.1.0.162三件套安装包

    QuartusSetup 13 1 0 162 链接 https pan baidu com s 1B01zWG76kfNcGLA0VmwyMw 提取码 jjdd ModelSimSetup 13 1 0 162 链接 https pan
  • DSP Builder安装时的注意事项

    注意事项1 DSP Builder是以组件的形式安装在altera下面的 xff0c 并且altera要与matlab 32位 xff01 xff01 安装在同一路径下 xff0c 这个路径不要带有中文 xff0c 或者空格字符 xff0c
  • Maven3.6.1下载与配置,超详细

    Maven3 6 1下载与配置 xff0c 超详细Maven3 6 1下载与配置 xff0c 超详细Maven3 6 1下载与配置 xff0c 超详细 Maven下载与配置 1 官网下载对应版本 xff0c 推荐下载免安装版 下载地址 ht
  • 【GIS】GIS矢量空间分析(上)

    0 GIS的基本概念 栅格数据与矢量数据 上图中 xff0c a为图形模拟表示的地理对象 xff0c b为控件对象对应的栅格数据模型表示 xff0c c为对应的矢量数据模型表示 矢量模型的表达源于原型空间实体本身 xff0c 通常以坐标来定
  • ROS简介-从零开始讲解ROS(适合超零基础阅读)

    1 前言 笔者以前是机械专业 xff0c 对于计算机方面的学习是少之又少 xff0c 接触机器人的学习之后 xff0c 比如路径规划 算法等 xff0c 发现很难入门 xff0c 不过慢慢摸爬滚打之后还是有了一些认识 xff0c 俗话说的好
  • C++primer plus第六版课后编程练习答案14.1

    include lt iostream gt include lt string gt using namespace std template lt class T1 class T2 gt class Pair private T1 a
  • 区块链——脱坑truffle

    使用truffle构建一个智能合约 实现输出 helloworld 的功能 网上有很多帖子 但也有很多坑 这里展示我的搭建过程 帮助大家绕过那些麻烦 一 安装web3 solc truffle npm g install solc npm
  • ucosii消息队列使用

    ucosii消息队列简介 ucosii的消息队列源码定义在os q c文件 xff1b 接口全部声明在ucos ii h xff0c 总共有如下接口 xff1a span class token keyword void span span

随机推荐