我正在实现一个流量控制组件,限制可以发送的最大请求。每个工作线程可以发送单个请求或一批请求,但任何时候待处理请求的总数都不应超过最大数量。
我最初想用 SemaphoreSlim 来实现:
将信号量初始化为最大请求计数,然后当工作线程要调用服务时,它必须获取足够数量的令牌,但是我发现实际上 SemaphoreSlim 和 Semaphore 只允许线程将信号量计数减少 1,在我的情况下我想要减少工作线程承载的请求数。
我应该在这里使用什么同步原语?
需要澄清的是,该服务支持批处理,因此一个线程可以在一次服务调用中发送 N 个请求,但相应地它应该能够将信号量的当前计数减少 N。
下面是定制的SemaphoreManyFifo
提供方法的类Wait(int acquireCount)
方法和Release(int releaseCount)
。其行为严格遵循先进先出原则。它具有相当不错的性能(在我的 PC 中,8 个线程每秒约 500,000 次操作)。
public class SemaphoreManyFifo : IDisposable
{
private readonly object _locker = new object();
private readonly Queue<(ManualResetEventSlim, int AcquireCount)> _queue;
private readonly ThreadLocal<ManualResetEventSlim> _pool;
private readonly int _maxCount;
private int _currentCount;
public int CurrentCount => Volatile.Read(ref _currentCount);
public SemaphoreManyFifo(int initialCount, int maxCount)
{
// Proper arguments validation omitted
Debug.Assert(initialCount >= 0);
Debug.Assert(maxCount > 0 && maxCount >= initialCount);
_queue = new Queue<(ManualResetEventSlim, int)>();
_pool = new ThreadLocal<ManualResetEventSlim>(
() => new ManualResetEventSlim(false), trackAllValues: true);
_currentCount = initialCount;
_maxCount = maxCount;
}
public SemaphoreManyFifo(int initialCount) : this(initialCount, Int32.MaxValue) { }
public void Wait(int acquireCount)
{
Debug.Assert(acquireCount > 0 && acquireCount <= _maxCount);
ManualResetEventSlim gate;
lock (_locker)
{
Debug.Assert(_currentCount >= 0 && _currentCount <= _maxCount);
if (acquireCount <= _currentCount && _queue.Count == 0)
{
_currentCount -= acquireCount; return; // Fast path
}
gate = _pool.Value;
gate.Reset(); // Important, because the gate is reused
_queue.Enqueue((gate, acquireCount));
}
gate.Wait();
}
public void Release(int releaseCount)
{
Debug.Assert(releaseCount > 0);
lock (_locker)
{
Debug.Assert(_currentCount >= 0 && _currentCount <= _maxCount);
if (releaseCount > _maxCount - _currentCount)
throw new SemaphoreFullException();
_currentCount += releaseCount;
while (_queue.Count > 0 && _queue.Peek().AcquireCount <= _currentCount)
{
var (gate, acquireCount) = _queue.Dequeue();
_currentCount -= acquireCount;
gate.Set();
}
}
}
public void Dispose()
{
foreach (var gate in _pool.Values) gate.Dispose();
_pool.Dispose();
}
}
在上述实现中添加对超时和取消的支持并非易事。它将需要一个不同的(可更新的)数据结构而不是Queue<T>
.
原本的Wait+Pulse实现可以在第一次修订这个答案。它很简单,但缺乏理想的 FIFO 行为。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)