这是我对这个问题的看法:
Update:通过借鉴 Enigmativity 的想法,我能够大大简化我建议的解决方案answer. The Observable.StartAsync
方法自动处理取消的混乱业务,并且可以简单地通过使用SemaphoreSlim.
/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
Func<bool, CancellationToken, Task<T>> functionAsync,
TimeSpan period,
out Action manualInvocation)
{
// Arguments validation omitted
var manualSubject = new Subject<bool>();
manualInvocation = () => manualSubject.OnNext(true);
return Observable.Defer(() =>
{
var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
return Observable
.Interval(period)
.Select(_ => false) // Not manual
.Merge(manualSubject)
.TakeUntil(isManual => isManual) // Stop on first manual
.Repeat() // ... and restart the timer
.Prepend(false) // Skip the initial interval delay
.Select(isManual =>
{
if (isManual)
{
// Triggered manually
return Observable.StartAsync(async ct =>
{
await semaphore.WaitAsync(ct);
try { return await functionAsync(isManual, ct); }
finally { semaphore.Release(); }
});
}
else if (semaphore.Wait(0))
{
// Triggered by the timer and semaphore acquired synchronously
return Observable
.StartAsync(ct => functionAsync(isManual, ct))
.Finally(() => semaphore.Release());
}
return null; // Otherwise ignore the signal
})
.Where(op => op != null)
.Switch(); // Pending operations are unsubscribed and canceled
});
}
The out Action manualInvocation
参数是触发手动调用的机制。
使用示例:
int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
await Task.Delay(500, token);
return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();
await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);
subscription.Dispose();
Output:
19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic
使用技术Scan
和DistinctUntilChanged
操作符是为了在上一个异步操作运行时删除元素,借用自this问题。
¹ It seems that the Rx library does not handle this messy business satisfactory though, since it just omits disposing of the CancellationTokenSources it creates.