我的问题有点像内格尔算法 https://en.wikipedia.org/wiki/Nagle%27s_algorithm是为了解决问题而创建的,但不完全是为了解决问题。我想要的是缓冲OnNext
通知来自IObservable<T>
成一个序列IObservable<IList<T>>
就像这样:
- 当第一个
T
通知到达,将其添加到缓冲区并开始倒计时
- 如果另一个
T
倒计时结束前通知到达,将其添加到缓冲区并重新开始倒计时
- 一旦倒计时结束(即生产者已经沉默一段时间),转发所有缓冲的
T
通知作为单一聚合IList<T>
通知。
- 如果在倒计时到期之前缓冲区大小超出某个最大值,则无论如何都要发送。
IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler)
看起来很有希望,但它似乎定期发送聚合通知,而不是执行我想要的“当第一个通知到达时启动计时器并在其他通知到达时重新启动计时器”行为,并且它还在如果下面没有生成通知,则每个时间窗口结束。
I do not想要删除任何一个T
通知;只是缓冲它们。
是否存在这样的东西,或者我需要自己编写吗?
SO 上存在一些类似的问题,但不完全像这样。
这是一个可以解决问题的扩展方法。
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)