非常有趣的问题!比我最初想象的要棘手得多:-)
我喜欢无锁解决方案,因此我尝试在下面找到一种解决方案。
有很多方法可以思考这个系统。你可以建模
它作为一个固定大小的循环缓冲区/队列(有两个条目),但是然后
您失去了更新下一个可用消费值的能力,
因为你不知道消费者是否已经开始阅读最近发布的内容
值或仍在(可能)阅读前一个。所以额外的状态
为了达到更优化的效果,需要超出标准环形缓冲区的范围
解决方案。
首先请注意,总有一个生产者可以安全写入的单元格
在任何给定时间点;如果消费者正在读取一个单元格,则
其他的都可以写入。我们将可以安全写入的单元格称为
“活动”单元格(可以潜在读取的单元格是任何单元格isn't活跃的那个)。仅当其他小区不活跃时,才可以切换活动小区
当前正在读取。
与始终可以写入的活动单元不同,非活动单元可以
仅当它包含值时才被读取;一旦这个价值被消耗掉,它就消失了。
(这意味着在积极的生产者的情况下可以避免活锁;在某些情况下
此时,消费者将清空电池并将停止触摸电池。一次
发生这种情况时,生产者肯定可以发布一个值,而在此之前,
如果消费者不在其中,它只能发布一个值(更改活动单元格)
读到中间。)
如果有is一个可以被消费的值,只有消费者可以改变它
事实(无论如何,对于非活动单元);后续生产可能会改变哪个单元
处于活动状态且已发布值,但值将始终可供读取,直到
它被消耗了。
一旦生产者完成对活动单元的写入,它可以通过以下方式“发布”该值:
更改哪个单元格是活动单元格(交换索引),前提是消费者是
不在读取另一个单元格的过程中。如果消费者is在中间
读取另一个单元格时,交换不会发生,但在这种情况下,消费者可以交换
后it's完成读取值,前提是生产者不在中间
写入(如果是,生产者将在完成后进行交换)。
事实上,一般来说,消费者在读完之后总是可以交换(如果它是唯一的一个)
访问系统),因为消费者的虚假交换是良性的:如果存在
另一个单元格中的某些内容,然后交换将导致接下来读取该内容,并且如果
没有,交换没有任何影响。
因此,我们需要一个共享变量来跟踪活动单元格是什么,并且我们还需要一个
生产者和消费者都可以指示他们是否处于生产过程中的方式
手术。我们可以将这三个状态按顺序存储到一个原子变量中
能够同时(原子地)影响它们。
我们还需要一种方法让消费者检查里面是否有东西
首先是非活动单元,并且两个线程都可以修改该状态
作为适当的。我尝试了其他几种方法,但最终最简单的就是
也将此信息包含在另一个原子变量中。这让事情变得很多
推理起来更简单,因为系统中的所有状态更改都是原子的。
我想出了一个无等待的实现(无锁,所有操作完成
在有限数量的指令中)。
代码时间!
#include <atomic>
#include <cstdint>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() : m_state(0) { }
~ProducerConsumerDoubleBuffer() { }
// Never returns nullptr
T* start_writing() {
// Increment active users; once we do this, no one
// can swap the active cell on us until we're done
auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
return &m_buf[state & 1];
}
void end_writing() {
// We want to swap the active cell, but only if we were the last
// ones concurrently accessing the data (otherwise the consumer
// will do it for us when *it's* done accessing the data)
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
if ((state & 0x6) == 0) {
// The consumer wasn't in the middle of a read, we should
// swap (unless the consumer has since started a read or
// already swapped or read a value and is about to swap).
// If we swap, we also want to clear the full flag on what
// will become the active cell, otherwise the consumer could
// eventually read two values out of order (it reads a new
// value, then swaps and reads the old value while the
// producer is idle).
m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
}
}
// Returns nullptr if there appears to be no more data to read yet
T* start_reading() {
m_readState = m_state.load(std::memory_order_relaxed);
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// Nothing to read here!
return nullptr;
}
// At this point, there is guaranteed to be something to
// read, because the full flag is never turned off by the
// producer thread once it's on; the only thing that could
// happen is that the active cell changes, but that can
// only happen after the producer wrote a value into it,
// in which case there's still a value to read, just in a
// different cell.
m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;
// Now that we've incremented the user count, nobody can swap until
// we decrement it
return &m_buf[(m_readState & 1) ^ 1];
}
void end_reading() {
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// There was nothing to read; shame to repeat this
// check, but if these functions are inlined it might
// not matter. Otherwise the API could be changed.
// Or just don't call this method if start_reading()
// returns nullptr -- then you could also get rid
// of m_readState.
return;
}
// Alright, at this point the active cell cannot change on
// us, but the active cell's flag could change and the user
// count could change. We want to release our user count
// and remove the flag on the value we read.
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
// Oi, we were the last ones accessing the data when we released our cell.
// That means we should swap, but only if the producer isn't in the middle
// of producing something, and hasn't already swapped, and hasn't already
// set the flag we just reset (which would mean they swapped an even number
// of times). Note that we don't bother swapping if there's nothing to read
// in the other cell.
m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
}
}
private:
T m_buf[2];
// The bottom (lowest) bit will be the active cell (the one for writing).
// The active cell can only be switched if there's at most one concurrent
// user. The next two bits of state will be the number of concurrent users.
// The fourth bit indicates if there's a value available for reading
// in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
std::atomic<std::uint32_t> m_state;
std::uint32_t m_readState;
};
请注意,语义是消费者永远不能两次读取给定值,
并且它读取的值总是比它最后读取的值新。也还算可以
内存使用效率高(两个缓冲区,就像您原来的解决方案一样)。我避免了 CAS 循环
因为它们通常比争用的单个原子操作效率低。
如果您决定使用上面的代码,我建议您首先为其编写一些全面的(线程)单元测试。
和适当的基准。我确实测试过,但只是勉强测试。如果您发现任何错误,请告诉我:-)
我的单元测试:
ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_writing();
if (item != nullptr) { // Always true
*item = i;
}
buf.end_writing();
}
});
std::thread consumer([&]() {
int prev = -1;
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_reading();
if (item != nullptr) {
assert(*item > prev);
prev = *item;
}
buf.end_reading();
}
});
producer.join();
consumer.join();
至于你原来的实现,我只是粗略地看了一下(这更有趣
设计新东西,呵呵),但 david.pfx 的答案似乎解决了你问题的这一部分。