这里有一个RateLimiter
您可以使用该类来限制异步操作的频率。这是一个更简单的实现RateLimiter
找到的类this answer.
/// <summary>
/// Limits the number of workers that can access a resource, during the specified
/// time span.
/// </summary>
public class RateLimiter
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Schedule the release of the semaphore using a Timer.
// Use the newly created Timer object as the state object, to prevent GC.
// Handle the unlikely case that the _timeUnit is invalid.
System.Threading.Timer timer = new(_ => _semaphore.Release());
try { timer.Change(_timeUnit, Timeout.InfiniteTimeSpan); }
catch { _semaphore.Release(); throw; }
}
}
使用示例:
List<string> urls = GetUrls();
using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));
string[] documents = await Task.WhenAll(urls.Select(async url =>
{
await rateLimiter.WaitAsync();
return await _httpClient.GetStringAsync(url);
}));
在线演示.
The Timer是用这个特定的构造函数以防止它在触发之前被垃圾收集,如中所述这个答案作者:尼克·H.
Note:这个实现有点泄漏,因为它创建了内部一次性的System.Threading.Timer
对象,当您使用完后不会被处置RateLimiter
。任何活动的计时器都会阻止RateLimiter
从被垃圾收集直到这些计时器触发它们的回调。还有SemaphoreSlim未处置正如它应该。这些都是小缺陷,不太可能影响仅创建少数内容的程序RateLimiter
s。如果您打算创建很多,您可以看看第三次修订这个答案的特点是一次性RateLimiter
基于Task.Delay
method.
这是一个替代实现RateLimiter
类,更复杂,它基于Environment.TickCount64属性而不是SemaphoreSlim
。它的优点是不会在后台创建“即发即忘”计时器。缺点是WaitAsync
方法不支持CancellationToken
争论,并且由于复杂性,出现错误的可能性更高。
public class RateLimiter
{
private readonly Queue<long> _queue;
private readonly int _maxActionsPerTimeUnit;
private readonly int _timeUnitMilliseconds;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
// Arguments validation omitted
_queue = new Queue<long>();
_maxActionsPerTimeUnit = maxActionsPerTimeUnit;
_timeUnitMilliseconds = checked((int)timeUnit.TotalMilliseconds);
}
public Task WaitAsync()
{
int delayMilliseconds = 0;
lock (_queue)
{
long currentTimestamp = Environment.TickCount64;
while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
{
_queue.Dequeue();
}
if (_queue.Count >= _maxActionsPerTimeUnit)
{
long refTimestamp = _queue
.Skip(_queue.Count - _maxActionsPerTimeUnit).First();
delayMilliseconds = checked((int)(refTimestamp - currentTimestamp));
Debug.Assert(delayMilliseconds >= 0);
if (delayMilliseconds < 0) delayMilliseconds = 0; // Just in case
}
_queue.Enqueue(currentTimestamp + delayMilliseconds
+ _timeUnitMilliseconds);
}
if (delayMilliseconds == 0) return Task.CompletedTask;
return Task.Delay(delayMilliseconds);
}
}