Forward
这个答案基于 Rx 团队直接给出的明确解释this http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx帖子 - 请注意,它很长,涵盖的内容远不止这一点。转到标题为的部分在 Rx 查询运算符中利用“异步”一切都得到了解释,包括一个具体的例子ScheduleAsyc
在标题为通过“await”使调度程序更易于使用
这是我尝试解释的:
Summary
的主要动机ScheduleAsync
的目的是采用 C# 5 的异步/等待功能,以简化对许多事件执行“公平”调度的代码编写,否则可能会导致调度程序缺乏其他操作。这就是“协作调度”的含义——与共享调度程序的其他代码很好地合作。您可以通过安排下一个事件,然后放弃控制权直到该事件触发并挂钩到该事件来安排下一个事件,依此类推。
在 Rx 2.0 之前,这是通过递归调度实现的。
朴素的例子
以下是链接文章中的示例,它提供了 Range 运算符的实现。这种实现很糟糕,因为它不让出控制权,导致调度程序挨饿:
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.Schedule(() =>
{
for (int i = 0; i < count; i++)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(start + i);
}
observer.OnCompleted();
});
});
}
请注意 OnNext 如何位于循环中,在不放弃控制的情况下锤击调度程序(如果调度程序是单线程的,情况尤其糟糕)。它使其他操作失去了安排其操作的机会,并且不允许在取消时中止。我们该如何解决这个问题呢?
递归调度 - Pre-Rx 2.0 解决方案
这是通过递归调度来解决这个问题的旧方法 - 很难看出发生了什么。这不是“命令式编码风格”。递归调用self()
当你第一次看到它时,它是相当令人大脑融化的不透明——对我来说是第十次,尽管我最终得到了它。传奇人物巴特·德·斯梅特 (Bart de Smet) 的这篇经典文章将告诉您有关这项技术的更多信息 http://bartdesmet.net/blogs/bart/archive/2009/11/08/jumping-the-trampoline-in-c-stack-friendly-recursion.aspx。无论如何,这是递归风格:
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.Schedule(0, (i, self) =>
{
if (i < count)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(start + i);
self(i + 1); /* Here is the recursive call */
}
else
{
observer.OnCompleted();
}
});
});
}
为了更加公平,如果订阅被处置,下一个待处理的计划操作将被取消。
新的异步/等待风格
这是通过 async/await 的编译器转换进行延续的新方法,它允许“命令式编码风格”。请注意,与递归风格相比,动机是更易读 - async/await 脱颖而出,以 .NET 惯用的方式显示正在发生的情况:
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
for (int i = 0; i < count; i++)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(i);
await ctrl.Yield(); /* Use a task continuation to schedule next event */
}
observer.OnCompleted();
return Disposable.Empty;
});
});
}
它看起来就像一个 for 循环,但实际上await ctrl.Yield()
将让出控制权,允许其他代码访问调度程序。它使用任务延续一次只调度一个事件 - 也就是说,每次迭代仅在前一个迭代完成后才会发布到调度程序,从而避免调度程序上直接出现长队列。取消也有效,这次 Rx 框架将订阅的处理转换为通过传入的取消令牌ct
.
我建议阅读原帖 http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx如果链接仍然有效,我就从这里拿走了!