正如詹姆斯所说,Observable.Sample
将为您提供最新的产生值。但是,它将在计时器上执行此操作,而不是根据油门中第一个事件发生的时间。然而,更重要的是,如果您的采样时间很长(例如十秒),并且您的事件在采样后立即触发,那么您将在近十秒内不会获得该新事件。
如果您需要更严格的功能,则需要实现自己的函数。我冒昧地这样做了。这段代码肯定需要一些清理,但我相信它可以满足您的要求。
public static class ObservableEx
{
public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime)
{
return source.ThrottleMax(dueTime, maxTime, Scheduler.Default);
}
public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime, IScheduler scheduler)
{
return Observable.Create<T>(o =>
{
var hasValue = false;
T value = default(T);
var maxTimeDisposable = new SerialDisposable();
var dueTimeDisposable = new SerialDisposable();
Action action = () =>
{
if (hasValue)
{
maxTimeDisposable.Disposable = Disposable.Empty;
dueTimeDisposable.Disposable = Disposable.Empty;
o.OnNext(value);
hasValue = false;
}
};
return source.Subscribe(
x =>
{
if (!hasValue)
{
maxTimeDisposable.Disposable = scheduler.Schedule(maxTime, action);
}
hasValue = true;
value = x;
dueTimeDisposable.Disposable = scheduler.Schedule(dueTime, action);
},
o.OnError,
o.OnCompleted
);
});
}
}
还有一些测试...
[TestClass]
public class ThrottleMaxTests : ReactiveTest
{
[TestMethod]
public void CanThrottle()
{
var scheduler = new TestScheduler();
var results = scheduler.CreateObserver<int>();
var source = scheduler.CreateColdObservable(
OnNext(100, 1)
);
var dueTime = TimeSpan.FromTicks(100);
var maxTime = TimeSpan.FromTicks(250);
source.ThrottleMax(dueTime, maxTime, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(1000);
results.Messages.AssertEqual(
OnNext(200, 1)
);
}
[TestMethod]
public void CanThrottleWithMaximumInterval()
{
var scheduler = new TestScheduler();
var results = scheduler.CreateObserver<int>();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(175, 2),
OnNext(250, 3),
OnNext(325, 4),
OnNext(400, 5)
);
var dueTime = TimeSpan.FromTicks(100);
var maxTime = TimeSpan.FromTicks(250);
source.ThrottleMax(dueTime, maxTime, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(1000);
results.Messages.AssertEqual(
OnNext(350, 4),
OnNext(500, 5)
);
}
[TestMethod]
public void CanThrottleWithoutMaximumIntervalInterferance()
{
var scheduler = new TestScheduler();
var results = scheduler.CreateObserver<int>();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(325, 2)
);
var dueTime = TimeSpan.FromTicks(100);
var maxTime = TimeSpan.FromTicks(250);
source.ThrottleMax(dueTime, maxTime, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(1000);
results.Messages.AssertEqual(
OnNext(200, 1),
OnNext(425, 2)
);
}
}