在 Rx 中,当使用 Scheduler.NewThread 作为 ObserveOn 方法时,当 Rx 已经保证 OnNext 永远不会重叠时,让每个观察委托 (OnNext) 在新线程上运行有什么好处。如果每个 OnNext 都会被一个接一个地调用,为什么每个都需要新的线程呢?
我理解为什么人们想要在与订阅和应用程序线程不同的线程上运行观察委托,但在新线程上运行每个观察委托,而它们永远不会并行运行?......对我或我来说没有意义我在这里缺少一些东西吗?
例如
using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
namespace RxTesting
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);
var numbers = from number in Enumerable.Range(1,10) select Process(number);
var observableNumbers = numbers.ToObservable()
.ObserveOn(Scheduler.NewThread)
.SubscribeOn(Scheduler.NewThread);
observableNumbers.Subscribe(
n => Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId));
Console.ReadKey();
}
private static int Process(int number)
{
Thread.Sleep(500);
Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
Thread.CurrentThread.ManagedThreadId);
return number;
}
}
}
上面的代码产生以下结果。请注意,每次消费都是在一个新线程上完成的。
Application Thread : 8
Producing : 1 on Thread : 9
Consuming : 1 on Thread : 10
Producing : 2 on Thread : 9
Consuming : 2 on Thread : 11
Producing : 3 on Thread : 9
Consuming : 3 on Thread : 12
Producing : 4 on Thread : 9
Consuming : 4 on Thread : 13
Producing : 5 on Thread : 9
Consuming : 5 on Thread : 14
Producing : 6 on Thread : 9
Consuming : 6 on Thread : 15
Producing : 7 on Thread : 9
Consuming : 7 on Thread : 16
Producing : 8 on Thread : 9
Consuming : 8 on Thread : 17
Producing : 9 on Thread : 9
Consuming : 9 on Thread : 18
Producing : 10 on Thread : 9
Consuming : 10 on Thread : 19
NewThread 调度程序对于长时间运行的订阅者很有用。如果您未指定任何调度程序,则生产者将被阻止等待订阅者完成。通常,您可以使用 Scheduler.ThreadPool,但如果您希望有很多长时间运行的任务,您将不希望用它们阻塞您的线程池(因为它可能被多个可观察的订阅者使用) )。
例如,请考虑对您的示例进行以下修改。我将延迟移至订阅者,并添加了主线程何时准备好键盘输入的指示。请注意取消注释 NewThead 行时的差异。
using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
namespace RxTesting
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);
var numbers = from number in Enumerable.Range(1, 10) select Process(number);
var observableNumbers = numbers.ToObservable()
// .ObserveOn(Scheduler.NewThread)
// .SubscribeOn(Scheduler.NewThread)
;
observableNumbers.Subscribe(
n => {
Thread.Sleep(500);
Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId);
});
Console.WriteLine("Waiting for keyboard");
Console.ReadKey();
}
private static int Process(int number)
{
Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
Thread.CurrentThread.ManagedThreadId);
return number;
}
}
}
那么为什么 Rx 不优化为每个订阅者使用相同的线程呢?如果订阅者运行时间太长以至于您需要一个新线程,那么线程创建开销无论如何都是微不足道的。一个例外是,如果大多数订阅者都很短,但少数订阅者长时间运行,那么重用同一线程的优化确实会很有用。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)