我正在尝试找到一种方法,以无锁或非阻塞的方式为单个消费者/单个消费者创建环形缓冲区,该缓冲区将覆盖缓冲区中最旧的数据。我读过很多无锁算法,当缓冲区已满时“返回 false”时,这些算法就会起作用——即,不添加;但我什至找不到伪代码来说明当您需要覆盖最旧的数据时如何执行此操作。
我正在使用 GCC 4.1.2(工作中的限制,我无法升级版本...)并且我有 Boost 库,过去我制作了自己的 Atomic 变量类型,该类型与即将推出的规范(它并不完美,但它是线程安全的并且可以满足我的需要)。
当我思考时,我认为使用这些原子确实应该解决这个问题。关于我的想法的一些粗略的伪代码:
template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;
unsigned int getNextIndex(unsigned int i)
{
return (i + 1 ) % size;
}
public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
if(writeIndex == getNextIndex(readIndex)) //1
{
readIndex = getNextIndex(readIndex); //2
}
buf[writeIndex] = t;
writeIndex = getNextIndex(writeIndex); //3
}
bool consume(T& t)
{
if(readIndex == writeIndex) //4
return false;
t = buf[readIndex];
readIndex = getNexIndex(readIndex); //5
return true;
}
};
据我所知,这里不存在死锁情况,因此我们可以安全地避免这种情况(如果我上面的实现即使在伪代码级别上也是错误的,那么建设性的批评总是值得赞赏的)。
然而,我能找到的最大竞争条件是:
假设缓冲区已满。即writeIndex +1 = readIndex;
(1) 发生,就像调用consume 一样。这是真的
(4) 为 false,因此我们从缓冲区中读取
(5) 发生,并且 readIndex 提前一位(因此实际上缓冲区中有空间
(2) 发生,再次推进 readIndex,从而丢失该值。
基本上,这是一个经典问题,作者必须修改读者,从而导致竞争条件。如果每次访问它时都没有实际阻止整个列表,我想不出一种方法来防止这种情况发生。我缺少什么?
- 从具有适当进度保证的单个生产者/多个消费者队列开始。
- 如果队列已满并且推送失败,则弹出一个值。那么就会有空间来推动新的价值。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)