我是 Rx 新手,我在想当 IObservable 非常快地产生大量事件而 OnNext 需要很长时间时会发生什么。我猜想新事件会在内部以某种方式排队,这样我就可以运行我们的内存。我对吗?考虑下面的小例子:
Subject<int> subject = new Subject<int>();
subject.ObserveOn(Scheduler.ThreadPool).Subscribe(x => { Console.WriteLine("Value published: {0}", x); Thread.Sleep(100); },
() => Console.WriteLine("Sequence Completed."));
for (int i = 0; i < 10; i++)
{
subject.OnNext(i);
}
我发布了 10 个事件,而消费者方法非常慢。所有事件都经过处理,因此必须缓存在内存中。如果我发布很多事件,我的内存就会耗尽,对吗?或者我错过了什么?
有没有办法限制反应式扩展中待处理事件的数量?例如,如果有超过 5 个待处理事件,我想忽略新事件。
是的,你是对的,慢的消费者会造成排队。最接近您所要求的内置运算符是Observable.Sample
- 然而,这会丢弃较旧的事件,转而支持较新的事件。这是更常见的要求,因为它可以让缓慢的观察者赶上。
让我知道 Sample 是否足够,因为您描述的缓冲行为是一个不寻常的要求 - 它是可以实现的,但实现起来相当复杂,并且需要相当重要的代码。
编辑 - 如果您像这样使用示例,它将在每次 OnNext 之后返回最新事件(您需要提供一个调度程序才能使其工作 - 并且 NewThreadScheduler 创建一个线程每次订阅,不是每个事件:
void Main()
{
var source = Observable.Interval(TimeSpan.FromMilliseconds(10)).Take(100);
source.Sample(TimeSpan.Zero, NewThreadScheduler.Default)
.Subscribe(SlowConsumer);
Console.ReadLine();
}
private void SlowConsumer(long item)
{
Console.WriteLine(item + " " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(1));
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)