在屏障实现中将代码从顺序一致性更改为不太严格的排序

2023-12-27

我遇到了这段代码,用于简单地实现屏障(对于无法使用的代码std::experimental::barrier在 C++17 中或std::barrier在 C++20 中)在《C++ Concurrency in Action》一书中。

[编辑] 屏障是一种同步机制,其中一组线程(线程数传递给屏障的构造函数)可以到达并等待(通过调用 wait 方法)或到达并丢弃(通过调用 did_waiting)。如果组中的所有线程都到达屏障,则屏障将被重置,并且线程可以继续执行下一组操作。如果组中的某些线程脱落,则组中的线程数量相应减少,以进行下一轮与屏障的同步。 [编辑结束]

以下是为简单实现屏障而提供的代码。

struct barrier
{
   std::atomic<unsigned> count;
   std::atomic<unsigned> spaces;
   std::atomic<unsigned> generation;
   barrier(unsigned count_):count(count_),spaces(count_),generation(0)
   {}
   void wait(){
      unsigned const gen=generation.load();
      if(!--spaces){
         spaces=count.load();
         ++generation;
      }else{
         while(generation.load()==gen){
            std::this_thread::yield();
         }
      }
   }
   void done_waiting(){
      --count;
      if(!--spaces){
         spaces=count.load();
         ++generation;
      }
   }
};

作者 Anthony Williams 提到,他选择顺序一致性排序是为了更容易推理代码,并表示可以使用宽松的排序来提高代码效率。这就是我更改代码以采用宽松排序的方式。请帮助我理解我的代码是否正确。

struct barrier
{
   std::atomic<unsigned> count;
   std::atomic<unsigned> spaces;
   std::atomic<unsigned> generation;
   barrier(unsigned count_):count(count_),spaces(count_),generation(0)
   {}
   void wait(){
      unsigned const gen=generation.load(std::memory_order_acquire);
      if(1 == spaces.fetch_sub(1, std::memory_order_relaxed)){
         spaces=count.load(std::memory_order_relaxed);
         generation.fetch_add(1, std::memory_order_release);
      }else{
         while(generation.load(std::memory_order_relaxed)==gen){
            std::this_thread::yield();
         }
      }
   }
   void done_waiting(){
      count.fetch_sub(1, std::memory_order_relaxed);
      if(1 == spaces.fetch_sub(1, std::memory_order_relaxed)){
         spaces=count.load(std::memory_order_relaxed);
         generation.fetch_add(1, std::memory_order_release);
      }
   }
};

道理是这样的。生成的增量是一个释放操作,与等待调用中生成的加载同步。这确保了从 count 到空间的加载对于所有调用 wait 并读取使用释放语义存储的 Generation 新值的线程都是可见的。

此后对空间的所有操作都是RMW操作,它们参与释放序列,因此可以是宽松的操作。这个推理是正确的还是这个代码是错误的?请帮助我理解。提前致谢。

[编辑]我尝试像这样使用我的屏障代码。

void fun(barrier* b){
        std::cout << "In Thread " << std::this_thread::get_id() << std::endl;
        b->wait();
        std::cout << std::this_thread::get_id() << " First wait done" << std::endl;
        b->wait();
        std::cout << std::this_thread::get_id() << " Second wait done" << std::endl;
        b->done_waiting();
}

int main(){
        barrier b{2};
        std::thread t(fun, &b);
        fun(&b);
        std::cout << std::this_thread::get_id() << " " <<  b.get_count() << std::endl;
        t.join();
}

我还尝试使用更多线程来测试它,并且在表面运行中,它似乎做了正确的事情。但我仍然想了解我的推理是否正确,或者我是否遗漏了一些非常明显的东西。[编辑结束]


代码的修改版本

我相信以下版本的代码具有最弱的顺序 这仍然使它正确。

struct barrier
{
   std::atomic<unsigned> count;
   std::atomic<unsigned> spaces;
   std::atomic<unsigned> generation;
   barrier(unsigned count_):count(count_),spaces(count_),generation(0)
   {}
   void wait(){
      unsigned const gen=generation.load(std::memory_order_relaxed);
      if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
         spaces.store(count.load(std::memory_order_relaxed),
                      std::memory_order_relaxed);
         generation.fetch_add(1, std::memory_order_release);
      }else{
         while(generation.load(std::memory_order_acquire)==gen){
            std::this_thread::yield();
         }
      }
   }
   void done_waiting(){
      count.fetch_sub(1, std::memory_order_relaxed);
      if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
         spaces.store(count.load(std::memory_order_relaxed), 
                      std::memory_order_relaxed);
         generation.fetch_add(1, std::memory_order_release);
      }
   }
};

新版本讨论

在整个讨论中,当我们谈到到达屏障的“最后”线程时 特定的一代,我们指的是看到的独特线索spaces.fetch_sub(1)返回值 1。

一般来说,如果一个线程正在创建一个承诺某些操作的存储 完成后,它需要成为一个发布商店。当另一个线程加载时 该值可以证明操作已完成且安全 要使用结果,则需要获取该负载。

现在,在手头的代码中,线程指示它已完成其“常规”的方式 工作”(在调用之前排序的内容wait() or done_waiting()) 是通过递减spaces,即存储一个值 比修改顺序中的前一个值小 1。为了 一个非最后一个线程,那就是only存储它的作用。所以 那家商店,即spaces.fetch_sub(),必须释放。

对于最后一个线程,它知道它是最后一个的方法是通过加载 中的值 1spaces.fetch_sub()。这足以证明所有 其他线程已完成其常规工作,最后一个线程 线程可以安全地丢弃屏障。所以spaces.fetch_sub()需要 也可以获取。因此spaces.fetch_sub(1, std::memory_order_acq_rel).

非最后线程确定屏障已关闭的方式 他们可以安全地继续通过加载一个值generation在 yield 循环,并观察它与gen。所以 需要获取该负载;以及它观察到的商店,即generation.fetch_add(),需要释放。

我认为这就是我们需要的所有障碍。所做的工作last要更新的线程spaces from count有效地在其自己的 小关键部分,从获取负载 1 开始spaces并以释放增量结束generation。在 此时,所有其他线程都已加载并存储spaces, 每个调用的线程wait已经加载了旧值generation,以及每个调用的线程done_waiting()有 已经存储了新的减量值count。所以最后 线程可以通过轻松的排序安全地操作它们,因为知道 没有其他线程会这样做。

现在是初始负载gen = generation.load() in wait() does not需要获取。它不能被推下经过商店spaces,因为后者是释放。所以肯定是安全的 在值 1 存储到之前加载spaces,并且只有在那之后 可能有潜力last线程更新generation.

A proof

现在让我们尝试给出一个正式的证明来证明这个新版本是正确的。看两个线程,A 和 B, 哪个做

barrier b(N); // N >= 2
void thrA() {
    work_A();
    b.wait(); // or b.done_waiting(), doesn't matter which 
}

void thrB() {
    b.wait();
    work_B();
}

N-2其他线程,每个线程都调用b.wait() or b.done_waiting()。为了简单起见,我们假设我们从 第 0 代。

我们想证明这一点work_A()发生在之前work_B(), 以便 他们可以在没有数据的情况下对相同的数据进行操作(冲突) 种族。根据 B 是否是最后一个考虑两种情况 线程到达障碍物。

假设B是最后一个线程。这意味着它获取了 值 1 来自spaces。线程 A 必须有释放存储的一些内容 value >= 1,然后按原子 RMW 连续递减 操作(在其他线程中)直到达到 1。即释放 以 A 的存储为首的序列,B 的负载从 该序列中的最后一个副作用,因此 A 的存储与 B 的存储同步 加载。通过测序,work_A()发生在 A 的商店和 B 的商店之前 负载发生在之前work_B(), hence work_A()发生在之前work_B()如预期的。

现在假设 B 不是最后一个线程。然后它返回wait()仅当它加载时generation一个值不同于gen。 让 L 表示实际的最后一个线程,它可能是 A。

我声称,作为第一步,genB 中的值必须为 0。对于generation into gen发生在 B 发布商店之前spaces.fetch_sub(),如上所述,这导致了一个释放序列 最终存储值 1(在倒数第二个线程中)。这 负载spacesL 中的值来自该副作用,所以 B 的 存储到spaces与L的负载同步。 B 的负载generation发生在其存储之前spaces,L 的负载spaces发生在其存储之前(fetch_add()) to generation。所以 B 的负载generation发生在L的商店前面。通过读写 连贯性 [intro.races p17],B 的负载generation must not从 L 的存储中获取其值,而不是从 修改顺序中的一些较早的值。这必须 必然为 0,因为没有其他修改generation.

(如果我们在 G 代而不是 0 代工作,这只能证明gen <= G。但正如我在下面解释的,所有这些负载都发生在之前的增量之后generation,这是值 G 的存储位置。这样就证明了相反的不等式gen >= G.)

所以 B 从wait()仅当它加载 1 时generation。 因此,最终的获取负载已从发布中获取其值 存储到generation由 L 完成,表明 L 的商店发生在之前 B回来了现在 L 的负载为 1spaces发生在它的商店之前 到generation(通过排序),A 的存储到spaces发生 如前所述,在 L 的负载为 1 之前。 (如果 L = A 则相同 通过排序,结论仍然成立。)我们现在有以下结果 操作完全按照之前发生的顺序排序:

A: work_A();
A: store side of spaces.fetch_sub()
L: load of 1 in spaces.fetch_sub()
L: store side of generation.fetch_add()
B: acquire load of 1 from generation
B: work_B()

并通过传递性得到想要的结论。 (如果 L=A,则删除上面的第 2 行和第 3 行。)

我们可以类似地证明所有的减量count在里面 各种电话done_waiting()发生在 L 的负载之前count到 将其存储到spaces,因此可以安全地放松这些。这 重新初始化spacesL 中,增量为generation, 两者都发生在任何线程从wait(),所以即使那些 放松,之后排序的任何屏障操作都会看到 障碍正确重置。

我认为这涵盖了所有所需的语义。

Fences

实际上,我们可以通过使用栅栏来进一步削弱一些东西。为了 例如,acq in spaces.fetch_sub()纯粹是为了利益 线程 L 加载值 1;其他线程不需要它。所以 我们可以这样做

if (1 == spaces.fetch_sub(1, std::memory_order_release)){
    std::atomic_thread_fence(std::memory_order_acquire);
    // ...
}

那么只有线程L需要支付acquire的成本。不是那个 这真的很重要,因为所有其他线程都会休眠 无论如何,所以我们不太可能关心它们是否缓慢。

OP 原始版本中的数据竞争

(本节是在我做出上面的修改版本之前写的。)

我相信至少有两个错误。

我们来分析一下wait()通过它自己。考虑执行以下操作的代码:

int x = 0;
barrier b(2);

void thrA() {
    x = 1;
    b.wait();
}

void thrB() {
    b.wait();
    std::cout << x << std::endl;
}

我们希望证明x = 1在thrA发生在之前的评价x在 thrB 中,这样代码就不会出现数据竞争并被迫打印该值1.

但我认为我们不能。假设 thrB 首先到达屏障,也就是说它观察到spaces.fetch_sub返回 2。因此每个线程中执行的加载和存储的顺序如下:

thrA:
x = 1;
gen=generation.load(std::memory_order_acquire);     // returns 0
spaces.fetch_sub(1, std::memory_order_relaxed);     // returns 1, stores 0
spaces=count.load(std::memory_order_relaxed);       // stores 2
generation.fetch_add(1, std::memory_order_release); // returns 0, stores 1

thrB:
gen=generation.load(std::memory_order_acquire);     // returns 0
spaces.fetch_sub(1, std::memory_order_relaxed);     // returns 2, stores 1
generation.load(std::memory_order_relaxed);         // returns 0
... many iterations
generation.load(std::memory_order_relaxed);         // returns 1
x;                                                  // returns ??

为了有希望,我们必须做一些手术A在 thrA 中与某些操作同步B在thrB。这只有在以下情况下才有可能:B是一个获取操作,它从以释放序列为首的副作用中获取其值A。但thrB中只有一次acquire操作,即初始的generation.load(std::memory_order_acquire)。并且它不取它的值(即0) 来自 thrB 中的任何操作,但来自 的初始化generation这发生在任一线程启动之前。这个副作用不是任何有用的释放序列的一部分,当然也不是以之后发生的操作为首的任何释放序列的一部分x=1。所以我们的证明尝试失败了。

更非正式地说,如果我们检查thrB,我们看到评估x可以在任何或所有宽松操作之前重新排序。事实上,它只是有条件地评估generation.load(std::memory_order_relaxed)返回 1 没有帮助;我们可以有x推测加载得更早,并且该值仅在之后使用generation.load(std::memory_order_relaxed)最终返回 1。所以我们所知道的是x之后的某个时间进行评估generation.load(std::memory_order_acquire)返回 0,这完全没有给我们提供任何关于 thrA 到那时可能或可能不做什么的有用信息。

这个特殊问题可以通过升级负载来解决generation在自旋循环中进行获取,或者在循环退出之后但之前放置获取栅栏wait()返回。


As for done_waiting,看来也有问题。如果我们有

void thrA() {
    x = 1;
    b.done_waiting();
}

void thrB() {
    b.wait();
    std::cout << x;
}

那么大概我们又想要1打印,没有数据竞争。但假设 thrA 首先到达障碍物。那么它所做的就是

x = 1;
count.fetch_sub(1, std::memory_order_relaxed); // irrelevant
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 2, stores 1

根本没有发布存储,因此它无法与 thrB 同步。

非正式地说,没有任何障碍可以阻止商店x=1以免被无限期推迟,因此不能保证 thrB 会遵守。

现在已经很晚了,所以目前我将其作为如何解决此问题的练习。


顺便说一句,ThreadSanitizer 会检测这两种情况的数据争用:https://godbolt.org/z/1MdbMbYob https://godbolt.org/z/1MdbMbYob。我可能应该先尝试一下,但最初我不太清楚要实际测试什么。


目前我不确定这些是否是唯一的错误,或者是否还有更多错误。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在屏障实现中将代码从顺序一致性更改为不太严格的排序 的相关文章

随机推荐

  • FlushViewOfFile (Windows) 和 msync(Linux) 的时间消耗

    我们感兴趣的是时间消耗FlushViewOfFile https learn microsoft com en us windows win32 api memoryapi nf memoryapi flushviewoffile and
  • Scala 过滤器元组 (x, y) == (y, x)

    我有一个元组列表 例如 0 2 0 5 2 0 2 5 3 4 4 3 5 0 5 2 有一些元组在哪里 x y y x 例如 5 0 和 0 5 我只想留下其中一个 例如第一个 我该怎么做 使用左折叠 var ts List 0 2 0
  • Python 中真正的非阻塞 HTTPS 服务器

    我正在尝试用 Python 构建一个真正的非阻塞 HTTPS 服务器 如果每个人都玩得很好 下面的最小代码就可以正常工作 import BaseHTTPServer import SimpleHTTPServer import Socket
  • Python struct.pack 和 unpack

    我绝不是一个经验丰富的Python程序员 这就是为什么我相信这个问题可能有一个明显的答案 但我就是无法理解struct pack和unpack 我有以下代码 struct pack lt I elements self buf elemen
  • Android Compose 导航和 ViewModel 生命周期

    我刚刚开始使用 Compose 对于我来说 第一眼看上去 它就像是我喜欢的 SwiftUI 的副本 但当我真正开始使用它时 我很快就遇到了很多问题 显然 我需要找到如何使用它才能从中受益的正确方法 这是我的问题之一 package org
  • 确定 STDERR 是否要终端

    我有一套 Java 程序 它们在我们的 Linux 服务器上用作命令行工具 他们中的大多数使用一个在 STDERR 上打印进度条的类 类似于 Perl 的Term ProgressBar 我希望每当 STDERR 进入终端时显示进度条 并在
  • 使用表达式树的 Foreach 循环

    我见过这个构建动态表达式树时出现问题 https stackoverflow com questions 3646283 issue while building dynamic expression tree and 表达式 语句树 ht
  • OutOfMemoryException - 没有想法

    我知道我的问题没有简单的答案 但我希望得到一些想法 指南或 某种要看的东西清单 我有一个网络 Windows 服务不断抛出 OutOfMemoryException 该服务有两个针对 x86 和 x64 Windows 的版本 然而在 x6
  • C# 中 Bmp 转 jpg/png

    有没有办法在 C 中将 bmp 图像转换为 jpg png 而不损失质量 使用 Image 类我们可以将 bmp 转换为 jpg 但输出图像的质量很差 我们能否获得与使用最高质量的 Photoshop 转换为 jpg 的图像一样好的质量水平
  • 如何在netbeans中创建gradle多项目?

    我是 gradle 新手 正在尝试在 netbeans 中创建一个目录布局 myproject ear build gradle core build gradle web build gradle include core project
  • 了解 scikit-learn 中的 DictVectorizer?

    我正在探索不同的特征提取类scikit learn提供 正在阅读文档 http scikit learn org stable modules generated sklearn feature extraction DictVectori
  • 发光问题,未解决的外部问题

    我想开始使用 OpenGL 3 和 4 但我在让 Glew 工作时遇到问题 我尝试将 glew32 lib 包含在附加依赖项中 并且已将库和 dll 移动到主文件夹中 因此不应该有任何路径问题 我收到的错误是 Error 5 error L
  • PHP 7.x 使用 MAMP 与 MSSQL 服务器连接

    我正在尝试通过 MAMP 将 mssql 服务器连接到 PHP 7 0 8 我尝试过使用 freetds 在一些博客上人们说使用pdo dblib so扩展 但它不起作用 请指导我完成连接过程 对于仍然遇到此问题的人 Application
  • 使 sympy 表达式中的所有符号可交换

    假设 sympy 表达式中有许多非交换符号 例如 a c sympy symbols a c commutative False b sympy Symbol b expr a c b c 使表达式中的所有符号可交换的首选方式是什么 例如
  • 使用 lambda 作为函数指针时,模板参数推导/替换失败

    我想知道为什么在下面的代码中编译器无法使用 lambda 作为函数 foo 的参数 模板参数推导 替换失败 而一个简单的函数却可以工作 template
  • Datomic 中的 SQL LIKE 运算符

    我想运行一个 sql 查询 给定一个搜索关键字 将找到所有用户 其中他们的名字与该模式匹配 即在原始 SQL 中类似WHERE users name LIKE foo 我该怎么做呢 查询的当前结构 gt defn find users db
  • 传单用户触发事件

    有什么方法可以确定事件是通过编程方式触发还是由用户触发 我们希望在地图移动或缩放时重新加载标记列表 但我们最初使用以下命令设置地图的边界setBounds http leafletjs com reference html rectangl
  • 使用 plyr 将参数传递给 R 函数

    我无法解决一个问题 想写一个这样的函数 f describe lt function data var by require plyr res lt ddply data by summarize N sum is na var Mean
  • 如何在react-native android应用程序中显示GIF?

    我想通过我的 android 反应本机应用程序中的图像标签中的 URL 显示一个简单的 gif 但是当我启动它时 没有显示图像 中提供的代码docs https facebook github io react native docs im
  • 在屏障实现中将代码从顺序一致性更改为不太严格的排序

    我遇到了这段代码 用于简单地实现屏障 对于无法使用的代码std experimental barrier在 C 17 中或std barrier在 C 20 中 在 C Concurrency in Action 一书中 编辑 屏障是一种同