使用 bufferClosingSelector 工厂方法
decPL 建议使用重载Buffer
接受一个bufferClosingSelector
- 在打开新缓冲区时调用的工厂函数。它产生一个流,其第一个OnNext()
or OnCompleted()
信号刷新当前缓冲区。 decPL 代码如下所示:
observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))
这在解决方案方面取得了相当大的进展,但也存在一些问题:
- 在限制持续时间内持续发布消息的活动期间,服务器不会发送消息。这可能会导致大量且不经常发布的列表。
- 源有多个订阅;如果天气冷,可能会产生意想不到的副作用。这
bufferClosingSelector
工厂被称为each缓冲区关闭,因此如果源很冷,它将从初始事件而不是最近的事件中进行限制。
防止无限期节流
我们需要使用额外的机制来限制缓冲区长度并防止无限期的限制。Buffer
有一个重载,允许您指定最大长度,但不幸的是您不能将其与关闭选择器结合使用。
让我们调用所需的缓冲区长度限制n。回忆一下第一个OnNext
关闭选择器的 足以关闭缓冲区,所以我们需要做的就是Merge
带有发送计数流的节流阀OnNext
after n来自源头的事件。我们可以用.Take(n).LastAsync()
去做这个;采取第一个n事件,但忽略除最后一个之外的所有事件。这是 Rx 中非常有用的模式。
让源头变得“热”
为了解决该问题bufferClosingSelector
工厂重新订阅源,我们需要使用通用模式.Publish().RefCount()
在源上为我们提供一个仅向订阅者发送最新事件的流。这也是一个非常有用的模式,需要记住。
Solution
这是修改后的代码,其中节流持续时间与计数器合并:
var throttleDuration = TimeSpan.FromSeconds(5);
var bufferSize = 3;
// single subscription to source
var sourcePub = source.Publish().RefCount();
var output = sourcePub.Buffer(
() => sourcePub.Throttle(throttleDuration)
.Merge(sourcePub.Take(bufferSize).LastAsync()));
生产就绪代码和测试
这是一个带有测试的生产就绪实现(使用 nuget 包 rx-testing 和 nunit)。请注意调度程序的参数化以支持测试。
public static partial class ObservableExtensions
{
public static IObservable<IList<TSource>> BufferNearEvents<TSource>(
this IObservable<TSource> source,
TimeSpan maxInterval,
int maxBufferSize,
IScheduler scheduler)
{
if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
if (maxBufferSize <= 0)
throw new ArgumentOutOfRangeException(
"maxBufferSize", "maxBufferSize must be positive");
var publishedSource = source.Publish().RefCount();
return publishedSource.Buffer(
() => publishedSource
.Throttle(maxInterval, scheduler)
.Merge(publishedSource.Take(maxBufferSize).LastAsync()));
}
}
public class BufferNearEventsTests : ReactiveTest
{
[Test]
public void CloseEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int> expectedBuffer = new [] {1, 2, 3};
var expectedTime = maxInterval.Ticks + 300;
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(1000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer)));
}
[Test]
public void FarEventsAreUnbuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(1000, 1),
OnNext(2000, 2),
OnNext(3000, 3));
IList<int>[] expectedBuffers =
{
new[] {1},
new[] {2},
new[] {3}
};
var expectedTimes = new[]
{
maxInterval.Ticks + 1000,
maxInterval.Ticks + 2000,
maxInterval.Ticks + 3000
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)),
OnNext<IList<int>>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer)));
}
[Test]
public void UpToMaxEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 2;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int>[] expectedBuffers =
{
new[] {1,2},
new[] {3}
};
var expectedTimes = new[]
{
200, /* Buffer cap reached */
maxInterval.Ticks + 300
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)));
}
private static bool CheckBuffer<T>(IEnumerable<T> expected, IEnumerable<T> actual)
{
CollectionAssert.AreEquivalent(expected, actual);
return true;
}
}