我想设置一个 Rx 订阅,它可以立即响应事件,然后忽略指定“冷却”期内发生的后续事件。
开箱即用的 Throttle/Buffer 方法仅在超时结束后才会响应,这并不是我所需要的。
这是一些设置场景并使用 Throttle 的代码(这不是我想要的解决方案):
class Program
{
static Stopwatch sw = new Stopwatch();
static void Main(string[] args)
{
var subject = new Subject<int>();
var timeout = TimeSpan.FromMilliseconds(500);
subject
.Throttle(timeout)
.Subscribe(DoStuff);
var factory = new TaskFactory();
sw.Start();
factory.StartNew(() =>
{
Console.WriteLine("Batch 1 (no delay)");
subject.OnNext(1);
});
factory.StartNewDelayed(1000, () =>
{
Console.WriteLine("Batch 2 (1s delay)");
subject.OnNext(2);
});
factory.StartNewDelayed(1300, () =>
{
Console.WriteLine("Batch 3 (1.3s delay)");
subject.OnNext(3);
});
factory.StartNewDelayed(1600, () =>
{
Console.WriteLine("Batch 4 (1.6s delay)");
subject.OnNext(4);
});
Console.ReadKey();
sw.Stop();
}
private static void DoStuff(int i)
{
Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
}
}
现在运行的输出是:
第 1 批(无延迟)
在 508ms 处处理 1
第 2 批(1 秒延迟)
第 3 批(1.3 秒延迟)
第 4 批(1.6 秒延迟)
在 2114ms 处处理 4
请注意,批次 2 未被处理(这很好!),因为由于节流的性质,我们在请求之间等待 500 毫秒。批次 3 也没有被处理(这不太好,因为它距离批次 2 发生了超过 500 毫秒),因为它接近批次 4。
我正在寻找的是更像这样的东西:
第 1 批(无延迟)
在 ~0ms 处处理 1
第 2 批(1 秒延迟)
在约 1000 秒内处理 2
第 3 批(1.3 秒延迟)
第 4 批(1.6 秒延迟)
在 ~1600s 处理 4
请注意,在这种情况下不会处理批次 3(这很好!),因为它发生在批次 2 的 500 毫秒内。
EDIT:
这是我使用的“StartNewDelayed”扩展方法的实现:
/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
this TaskFactory factory, int millisecondsDelay)
{
return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}
/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
// Validate arguments
if (factory == null) throw new ArgumentNullException("factory");
if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");
// Create the timed task
var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
var ctr = default(CancellationTokenRegistration);
// Create the timer but don't start it yet. If we start it now,
// it might fire before ctr has been set to the right registration.
var timer = new Timer(self =>
{
// Clean up both the cancellation token and the timer, and try to transition to completed
ctr.Dispose();
((Timer)self).Dispose();
tcs.TrySetResult(null);
});
// Register with the cancellation token.
if (cancellationToken.CanBeCanceled)
{
// When cancellation occurs, cancel the timer and try to transition to cancelled.
// There could be a race, but it's benign.
ctr = cancellationToken.Register(() =>
{
timer.Dispose();
tcs.TrySetCanceled();
});
}
if (millisecondsDelay > 0)
{
// Start the timer and hand back the task...
timer.Change(millisecondsDelay, Timeout.Infinite);
}
else
{
// Just complete the task, and keep execution on the current thread.
ctr.Dispose();
tcs.TrySetResult(null);
timer.Dispose();
}
return tcs.Task;
}