代码的修改版本
我相信以下版本的代码具有最弱的顺序
这仍然使它正确。
struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):count(count_),spaces(count_),generation(0)
{}
void wait(){
unsigned const gen=generation.load(std::memory_order_relaxed);
if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
spaces.store(count.load(std::memory_order_relaxed),
std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}else{
while(generation.load(std::memory_order_acquire)==gen){
std::this_thread::yield();
}
}
}
void done_waiting(){
count.fetch_sub(1, std::memory_order_relaxed);
if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
spaces.store(count.load(std::memory_order_relaxed),
std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}
}
};
新版本讨论
在整个讨论中,当我们谈到到达屏障的“最后”线程时
特定的一代,我们指的是看到的独特线索spaces.fetch_sub(1)
返回值 1。
一般来说,如果一个线程正在创建一个承诺某些操作的存储
完成后,它需要成为一个发布商店。当另一个线程加载时
该值可以证明操作已完成且安全
要使用结果,则需要获取该负载。
现在,在手头的代码中,线程指示它已完成其“常规”的方式
工作”(在调用之前排序的内容wait()
or
done_waiting()
) 是通过递减spaces
,即存储一个值
比修改顺序中的前一个值小 1。为了
一个非最后一个线程,那就是only存储它的作用。所以
那家商店,即spaces.fetch_sub()
,必须释放。
对于最后一个线程,它知道它是最后一个的方法是通过加载
中的值 1spaces.fetch_sub()
。这足以证明所有
其他线程已完成其常规工作,最后一个线程
线程可以安全地丢弃屏障。所以spaces.fetch_sub()
需要
也可以获取。因此spaces.fetch_sub(1, std::memory_order_acq_rel)
.
非最后线程确定屏障已关闭的方式
他们可以安全地继续通过加载一个值generation
在
yield 循环,并观察它与gen
。所以
需要获取该负载;以及它观察到的商店,即generation.fetch_add()
,需要释放。
我认为这就是我们需要的所有障碍。所做的工作last
要更新的线程spaces
from count
有效地在其自己的
小关键部分,从获取负载 1 开始spaces
并以释放增量结束generation
。在
此时,所有其他线程都已加载并存储spaces
,
每个调用的线程wait
已经加载了旧值generation
,以及每个调用的线程done_waiting()
有
已经存储了新的减量值count
。所以最后
线程可以通过轻松的排序安全地操作它们,因为知道
没有其他线程会这样做。
现在是初始负载gen = generation.load()
in wait()
does not需要获取。它不能被推下经过商店spaces
,因为后者是释放。所以肯定是安全的
在值 1 存储到之前加载spaces
,并且只有在那之后
可能有潜力last
线程更新generation
.
A proof
现在让我们尝试给出一个正式的证明来证明这个新版本是正确的。看两个线程,A 和 B,
哪个做
barrier b(N); // N >= 2
void thrA() {
work_A();
b.wait(); // or b.done_waiting(), doesn't matter which
}
void thrB() {
b.wait();
work_B();
}
有N-2
其他线程,每个线程都调用b.wait()
or b.done_waiting()
。为了简单起见,我们假设我们从
第 0 代。
我们想证明这一点work_A()
发生在之前work_B()
, 以便
他们可以在没有数据的情况下对相同的数据进行操作(冲突)
种族。根据 B 是否是最后一个考虑两种情况
线程到达障碍物。
假设B是最后一个线程。这意味着它获取了
值 1 来自spaces
。线程 A 必须有释放存储的一些内容
value >= 1,然后按原子 RMW 连续递减
操作(在其他线程中)直到达到 1。即释放
以 A 的存储为首的序列,B 的负载从
该序列中的最后一个副作用,因此 A 的存储与 B 的存储同步
加载。通过测序,work_A()
发生在 A 的商店和 B 的商店之前
负载发生在之前work_B()
, hence work_A()
发生在之前work_B()
如预期的。
现在假设 B 不是最后一个线程。然后它返回wait()
仅当它加载时generation
一个值不同于gen
。
让 L 表示实际的最后一个线程,它可能是 A。
我声称,作为第一步,gen
B 中的值必须为 0。对于generation
into gen
发生在 B 发布商店之前spaces.fetch_sub()
,如上所述,这导致了一个释放序列
最终存储值 1(在倒数第二个线程中)。这
负载spaces
L 中的值来自该副作用,所以 B 的
存储到spaces
与L的负载同步。 B 的负载generation
发生在其存储之前spaces
,L 的负载spaces
发生在其存储之前(fetch_add()
) to generation
。所以
B 的负载generation
发生在L的商店前面。通过读写
连贯性 [intro.races p17],B 的负载generation
must not从 L 的存储中获取其值,而不是从
修改顺序中的一些较早的值。这必须
必然为 0,因为没有其他修改generation
.
(如果我们在 G 代而不是 0 代工作,这只能证明gen <= G
。但正如我在下面解释的,所有这些负载都发生在之前的增量之后generation
,这是值 G 的存储位置。这样就证明了相反的不等式gen >= G
.)
所以 B 从wait()
仅当它加载 1 时generation
。
因此,最终的获取负载已从发布中获取其值
存储到generation
由 L 完成,表明 L 的商店发生在之前
B回来了现在 L 的负载为 1spaces
发生在它的商店之前
到generation
(通过排序),A 的存储到spaces
发生
如前所述,在 L 的负载为 1 之前。 (如果 L = A 则相同
通过排序,结论仍然成立。)我们现在有以下结果
操作完全按照之前发生的顺序排序:
A: work_A();
A: store side of spaces.fetch_sub()
L: load of 1 in spaces.fetch_sub()
L: store side of generation.fetch_add()
B: acquire load of 1 from generation
B: work_B()
并通过传递性得到想要的结论。 (如果 L=A,则删除上面的第 2 行和第 3 行。)
我们可以类似地证明所有的减量count
在里面
各种电话done_waiting()
发生在 L 的负载之前count
到
将其存储到spaces
,因此可以安全地放松这些。这
重新初始化spaces
L 中,增量为generation
,
两者都发生在任何线程从wait()
,所以即使那些
放松,之后排序的任何屏障操作都会看到
障碍正确重置。
我认为这涵盖了所有所需的语义。
Fences
实际上,我们可以通过使用栅栏来进一步削弱一些东西。为了
例如,acq
in spaces.fetch_sub()
纯粹是为了利益
线程 L 加载值 1;其他线程不需要它。所以
我们可以这样做
if (1 == spaces.fetch_sub(1, std::memory_order_release)){
std::atomic_thread_fence(std::memory_order_acquire);
// ...
}
那么只有线程L需要支付acquire的成本。不是那个
这真的很重要,因为所有其他线程都会休眠
无论如何,所以我们不太可能关心它们是否缓慢。
OP 原始版本中的数据竞争
(本节是在我做出上面的修改版本之前写的。)
我相信至少有两个错误。
我们来分析一下wait()
通过它自己。考虑执行以下操作的代码:
int x = 0;
barrier b(2);
void thrA() {
x = 1;
b.wait();
}
void thrB() {
b.wait();
std::cout << x << std::endl;
}
我们希望证明x = 1
在thrA发生在之前的评价x
在 thrB 中,这样代码就不会出现数据竞争并被迫打印该值1
.
但我认为我们不能。假设 thrB 首先到达屏障,也就是说它观察到spaces.fetch_sub
返回 2。因此每个线程中执行的加载和存储的顺序如下:
thrA:
x = 1;
gen=generation.load(std::memory_order_acquire); // returns 0
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 1, stores 0
spaces=count.load(std::memory_order_relaxed); // stores 2
generation.fetch_add(1, std::memory_order_release); // returns 0, stores 1
thrB:
gen=generation.load(std::memory_order_acquire); // returns 0
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 2, stores 1
generation.load(std::memory_order_relaxed); // returns 0
... many iterations
generation.load(std::memory_order_relaxed); // returns 1
x; // returns ??
为了有希望,我们必须做一些手术A
在 thrA 中与某些操作同步B
在thrB。这只有在以下情况下才有可能:B
是一个获取操作,它从以释放序列为首的副作用中获取其值A
。但thrB中只有一次acquire操作,即初始的generation.load(std::memory_order_acquire)
。并且它不取它的值(即0
) 来自 thrB 中的任何操作,但来自 的初始化generation
这发生在任一线程启动之前。这个副作用不是任何有用的释放序列的一部分,当然也不是以之后发生的操作为首的任何释放序列的一部分x=1
。所以我们的证明尝试失败了。
更非正式地说,如果我们检查thrB
,我们看到评估x
可以在任何或所有宽松操作之前重新排序。事实上,它只是有条件地评估generation.load(std::memory_order_relaxed)
返回 1 没有帮助;我们可以有x
推测加载得更早,并且该值仅在之后使用generation.load(std::memory_order_relaxed)
最终返回 1。所以我们所知道的是x
之后的某个时间进行评估generation.load(std::memory_order_acquire)
返回 0,这完全没有给我们提供任何关于 thrA 到那时可能或可能不做什么的有用信息。
这个特殊问题可以通过升级负载来解决generation
在自旋循环中进行获取,或者在循环退出之后但之前放置获取栅栏wait()
返回。
As for done_waiting
,看来也有问题。如果我们有
void thrA() {
x = 1;
b.done_waiting();
}
void thrB() {
b.wait();
std::cout << x;
}
那么大概我们又想要1
打印,没有数据竞争。但假设 thrA 首先到达障碍物。那么它所做的就是
x = 1;
count.fetch_sub(1, std::memory_order_relaxed); // irrelevant
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 2, stores 1
根本没有发布存储,因此它无法与 thrB 同步。
非正式地说,没有任何障碍可以阻止商店x=1
以免被无限期推迟,因此不能保证 thrB 会遵守。
现在已经很晚了,所以目前我将其作为如何解决此问题的练习。
顺便说一句,ThreadSanitizer 会检测这两种情况的数据争用:https://godbolt.org/z/1MdbMbYob https://godbolt.org/z/1MdbMbYob。我可能应该先尝试一下,但最初我不太清楚要实际测试什么。
目前我不确定这些是否是唯一的错误,或者是否还有更多错误。