我需要在预定义数量的工作线程(通过应用程序配置静态已知)和预定义数量的控制平面线程之间正确同步对某些共享资源的访问。控制平面线程接收来自外部的请求,并基于该请求可能修改共享资源。工作线程只是运行一个无限循环,其中共享资源是只读的。为了以线程安全的方式做到这一点,并考虑到实际的应用程序用例(网络数据包处理、多数据平面线程和多控制平面线程),决定实现“线程屏障”类型的模式。以下是其实现方式的片段,假设应用程序配置为生成 2 个工作线程和 2 个控制平面线程:
std::atomic_bool barrier{};
std::atomic_uint32_t workers_at_barrier{};
// called by control-plane threads only!
void barrier_lock()
{
// optimized spinlock implementation: rigtorp.se/spinlock/
while (true)
{
if (!barrier.exchange(true, std::memory_order_acquire))
break;
while (barrier.load(std::memory_order_relaxed))
__builtin_ia32_pause();
}
assert(barrier);
// wait for ALL worker (data-plane) threads to arrive at the barrier!
while (workers_at_barrier.load() != 2);
assert(workers_at_barrier.load() == 2);
}
// called by control-plane threads only!
void barrier_unlock()
{
assert(barrier && workers_at_barrier.load() == 2);
barrier.store(false, std::memory_order_release);
// wait for ALL workers to get out of the barrier!
while (workers_at_barrier.load() != 0);
}
struct barrier_lock_guard
{
barrier_lock_guard()
{
barrier_lock();
}
~barrier_lock_guard()
{
barrier_unlock();
}
};
// control-plane threads receive some requests and handles them here
void handle_stuff()
{
// ... stuff
{
barrier_lock_guard blg;
// barrier should be set and all workers (2 in this case) should be waiting at the barrier for its release
assert(barrier && workers_at_barrier.load() == 2);
// ... writes to shared resource
}
// ... stuff
}
// called by worker threads only!
void wait_at_barrier()
{
// immediately return if barrier is not set
if (!barrier.load(std::memory_order_acquire))
return;
++workers_at_barrier;
// block at the barrier until it gets released
while (barrier.load(std::memory_order_acquire));
--workers_at_barrier;
}
// function run by the worker threads
void workers_stuff()
{
while (true)
{
wait_at_barrier();
// ... reads from shared resource
}
}
问题是断言assert(barrier && workers_at_barrier.load() == 2);
in handle_stuff()
正在受到打击。这种情况很少发生,所以一定是出了什么问题,我正在努力准确地了解问题出在哪里。很确定,虽然这与不正确的使用有关std::memory_order
。任何 C++ 原子专业人士都可以向我指出确切的问题以及正确的解决方法是什么?提前致谢。
这不是内存排序问题,只是一场普通的竞赛。即使将所有内存顺序升级为顺序一致性后,我也可以重现它。这是我在 godbolt 上的版本 https://godbolt.org/z/GG7jvsqsn尽管我只能在本地重现故障(godbolt 仅在一个核心上运行)。
评论wait for ALL workers to get out of the barrier!
in barrier_unlock
似乎指出了问题。这个循环不强制another控制线程等待;其他线程可以立即占据屏障。
或者,观察值workers_at_barrier == 2
in barrier_lock()
不能证明两个线程现在都在屏障处等待;他们可能在之前关闭时已经通过了它,但还没有抽出时间来减少原子计数器。
想象一下以下的事件顺序。我们有控制线程 C1、C2 和工作线程 W1、W2。 C1已经占据障碍物并且刚刚进入barrier_unlock()
。 C2刚刚进入barrier_lock()
。 W1和W2都在旋转while(barrier.load())
in wait_at_barrier()
, and workers_at_barrier
有价值2
.
-
C1: barrier.store(false)
-
W1: barrier.load()
: false
,自旋循环退出
-
C2: barrier.exchange(true)
:返回false
。跳出循环。现在barrier == true
.
-
C2: assert(barrier)
(通过)
-
C2: workers_at_barrier.load()
: 2. 的while
循环立即退出。
-
C2: assert(workers_at_barrier.load() == 2)
(通过)
-
C2 从返回barrier_lock()
-
W1: --workers_at_barrier
: 1
-
C2 in handle_stuff()
: Now barrier == true
and workers_at_barrier == 1
。断言失败。
我不确定临时的最佳解决方案。也许barrier
应该有第三个“耗尽”状态,其中控制线程仍然拥有屏障,但工作人员可以离开它。只有在他们这样做之后,控制线程才会完全释放屏障。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)