C++ 并行编程(thread)---多线程

2023-11-15

C++ 并行编程---多线程

Reference:

  1. 理解并行与并发
  2. 进程线程(一)——基础知识,什么是进程?什么是线程?
  3. cplusplus.com: Reference <thread> 包含每个API的详细用法
  4. 【公开课】C++11开始的多线程编程(#5)文章的主要来源,建议看原视频
  5. 线程的 5 种状态
  6. Linux 下的进程间通信:套接字和信号 | Linux 中国

相关文章:

  1. C++ RAII机制
  2. C++17 并行STL算法
  3. OpenCV 并行计算函数 parallel_for_ 的使用

1. 并发与并行

可以先记住:“并发”指的是程序的结构,“并行”指的是程序运行时的状态。

  • 并行(parallelism):就是同时执行的意思。判断程序是否处于并行的状态,就看同一时刻是否有超过一个“工作单元”在运行就好了。所以,单线程永远无法达到并行的状态。
  • 并发(concurrency):并发指的是程序的“结构”。当我们说这个程序是并发的,实际上,这句话应当表述成“这个程序采用了支持并发的设计”。所以正确的并发设计的标准是:“使多个操作可以在重叠的时间段内进行。”

这句话的重点有两个:

  1. “(操作)在重叠的时间段内进行”:它与前面说到的并行并不完全相同。并行,当然是在重叠的时间段内执行;但是另外一种执行模式,也属于在重叠时间段内进行,这就是协程
    在使用协程时,程序的执行看起来是这个样子的:
    在这里插入图片描述
    task1 和 task 是两段不同的代码,比如两个函数(一个是处理行情的函数,一个是处理交易的函数),其中黑色块代表某段代码正在执行。注意,这里从始至终,在任何一个时间点上都只有一段代码在执行。但由于 task1 和 task2 在重叠的时间段内执行,所以这是一个支持并发的设计。与并行不同,单核单线程支持并发。
    在这里插入图片描述

  2. “可以在重叠的时间段内进行”中的“可以”两个字:意思是正确的并发设计使并发执行成为可能,但是程序在实际运行时却不一定会出现多个任务执行时间段重叠的情形。比如:我们的程序会为每一个任务开一个线程或协程,只有一个任务时,显然不会出现多个任务执行时间段重叠的情况,有多个任务时,就出现了。

    这里就可以看到,并发并不描述程序执行的状态,它描述的是一种设计,是程序的结构,比如上面例子里“为每个任务开一个线程”的设计。并发设计和程序实际执行情况并没有直接关联,但是正确的并发设计让并发执行称为可能。反之,如果程序被设计为执行完一个任务再接着执行下一个,那就不是并发设计了,因为做不到并发执行。

之所以并发设计往往需要把流程拆开,是因为如果不拆分也就不可能在同一时间段进行多个任务了。这种拆分可以是平行的拆分,比如抽象成同类的任务,也可以是不平行的,比如分为多个步骤。
在这里插入图片描述
综上,在程序设计层面,并发设计让并发执行成为可能,而并行是并发执行的一种模式。(并行∈并发执行,并发执行=并行+协程)

2. 进程和线程

2.1 常规解释

进程(Process) 是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体

线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

2.2 总结

进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程——资源分配的最小单位。

线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。是程序执行的最小单位。

2.3 具体理解

Linux 环境下,每个进程有自己各自独立的大小(4G)的地址空间,大家互不干扰对方,如果两个进程之间通信的话,还需要借助第三方进程间通信工具 Inter-Process Communication, IPC 才能完成(关于进程间通信推荐这篇文章:Linux 下的进程间通信:套接字和信号 | Linux 中国 )。不同的进程通过页表映射,映射到物理内存上各自独立的存储空间,在操作系统的调度下,分别轮流占用 CPU 去运行,互不干扰、互不影响,甚至相互都不知道对方。在每个进程的眼里,CPU 就是他的整个世界,虽然不停地被睡眠,但是一旦恢复运行,一觉醒来,仿佛什么都没发生过一样,认为自己拥有整个 CPU,一直在占有它。

在一个进程中,可能存在多个线程,每个线程类似于合租的每个租客,除了自己的私有空间外,还跟其它线程共享进程的很多资源,如地址空间、全局数据、代码段、打开的文件等等。在线程中,通过各种加锁解锁的同步机制,一样可以用来防止多个线程访问共享资源产生冲突,比如互斥锁、条件变量、读写锁等。

进程具有的特征:

  • 动态性:进程是程序的一次执行过程,是临时的,有生命期的,是动态产生,动态消亡的;
  • 并发性:任何进程都可以同其他进程一起并发执行;
  • 独立性:进程是系统进行资源分配和调度的一个独立单位;
  • 结构性:进程由程序,数据和进程控制块三部分组成

对于操作系统来说,它可以同时运行多个任务。你可以一边听歌,一边打游戏,一边还等着QQ开着语音聊着天,这就是多任务,至少同时有 3 3 3 个任务正在运行。还有很多任务悄悄地在后台同时运行着,只是桌面上没有显示而已。对于过去的单核 CPU,也可以完成这些任务,由于 CPU 执行代码都是顺序执行的,那么,单核 CPU 就轮流让各个任务交替执行,任务 1 1 1 执行 0.01 0.01 0.01 秒,切换到任务 2 2 2,任务 2 2 2 执行 0.01 0.01 0.01 秒,再切换到任务 3 3 3,执行 0.01 0.01 0.01 秒……这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于 CPU 的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。

真正的并行执行多任务只能在多核 CPU 上实现,但是,由于任务数量远远多于 CPU 的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。

对于操作系统来说,一个任务就是一个进程,比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。

有些进程还不止同时干一件事,比如 Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程

由于每个进程至少要干一件事,所以,一个进程至少有一个线程。当然,像Word这种复杂的进程可以有多个线程,多个线程可以同时执行,多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样。当然,真正地同时执行多线程需要多核CPU才可能实现。

2.4 为什么使用多线程

  1. 和进程相比,它是一种非常“节俭”的多任务操作方式。在 Linux 系统中,启动一个新的进程必须分配给它独立的地址空间,建立众多的数据表来维护其代码段、堆栈段和数据段,这种多任务工作方式的代价非常“昂贵”。而运行于一个进程中的多个线程,它们彼此之间使用相同的地址空间,共享大部分数据,启动一个线程所花费的空间远远小于启动一个进程所花费的空间,而且线程间彼此切换所需要时间也远远小于进程间切换所需要的时间

  2. 线程间方便的通信机制。对不同进程来说它们具有独立的数据空间,要进行数据的传递只能通过通信的方式进行。这种方式不仅费时,而且很不方便。线程则不然,由于同一进程下的线程之间共享数据空间,所以一个线程的数据可以直接为其他线程所用,不仅方便,而且快捷。

2.5 进程和线程的区别

  1. 什么是进程,什么是线程?
    • 进程是程序一次执行的过程,动态的,进程切换时系统开销大
    • 线程是轻量级进程,切换效率高
  2. 进程和线程的空间分配?
    • 每个进程都有独立的0-3G的空间,都参与内核调度,互不影响
    • 同一进程中的线程共享相同的地址空间(共享0-3G)
  3. 进程之间和线程之间各自的通信方式
    • 进程间(from Linux 下的进程间通信:套接字和信号 | Linux 中国 ):
      • 共享文件(比如一个进程创建和写入一个文件,然后另一个进程从这个相同的文件中进行读取);
      • 共享内存(使用信号量)(在任何时候当共享内存进入一个写入者场景时,无论是多进程还是多线程,都有遇到基于内存的竞争条件的风险,所以,需要引入信号量来协调(同步)对共享内存的获取);
      • 管道(无名管道、命名管道);
      • 消息队列;
      • 套接字(socket)(套接字的两种类型:IPC 套接字(即 Unix 套接字)给予进程在相同设备(主机)上基于通道的通信能力;网络套接字给予进程运行在不同主机的能力,因此也带来了网络通信的能力。网络套接字需要底层协议的支持,例如 TCP(传输控制协议)或 UDP(用户数据报协议));
      • 信号(信号会中断一个正在执行的程序,在这种意义下,就是用信号与这个程序进行通信。大多数的信号要么可以被忽略(阻塞)或者被处理(通过特别设计的代码)。信号可以在与用户交互的情况下发生。例如,一个用户从命令行中敲了 Ctrl+C 来终止一个从命令行中启动的程序;Ctrl+C 将产生一个 SIGTERM 信号。SIGTERM 意即终止,它可以被阻塞或者被处理,而不像 SIGKILL 信号那样。一个进程也可以通过信号和另一个进程通信,这样使得信号也可以作为一种 IPC 机制)。
    • 线程间:全局变量,信号量,互斥锁

3. C++中的多线程

C++11 标准提供了一个新的线程库,内容包括了管理线程、保护共享数据、线程间的同步操作、低级原子操作(atomic operation)等各种类。

<thread>: 包含std::thread类以及std::this_thread命名空间。管理线程的函数和类在该头文件中有声明;
<atomic>: 包含std::atomic和std::atomic_flag类,以及一套C风格的原子类型和与C兼容的原子操作的函数;
<mutex>:  包含了与互斥量相关的类以及其他类型的函数;
<future>: 包含两个Provider类(std::promise和std::package_task)和两个Future类(std::future和std::shared_future)以及相关的类型和函数;
<condition_variable>: 包含与条件变量相关的类,包括std::condition_variable和std::condition_variable_any

在这里插入图片描述

那么现在开始吧,看看C++11开始的多线程编程是如何实现的!


3.1 存储持续性-补充

C++11 开始有四种不同的方案来存储数据(新增线程存储),这些方案的区别就在于数据保留在内存中的时间(详情可阅读 Storage-class specifiers):

  • 自动存储持续性:在函数定义中声明的变量(包括函数参数)的存储持续性为自动的。它们在程序开始执行其所属的函数或代码块时被创建,在执行完函数或代码块时,它们使用的内存被释放;
  • 静态存储持续性:在函数定义外定义的变量和使用关键字 static 定义的变量的存储持续性都为静态。它们在程序整个运行过程中都存在;
  • 动态存储持续性:用 new 运算符分配的内存将一直存在,直到使用 delete 运算符将其释放或程序结束为止。这种内存的存储持续性为动态,有时被称为自由存储(free store)堆(heap)
  • 线程存储持续性:当前,多核处理器很常见,这些 CPU 可同事处理多个执行任务。这让程序能够将计算放在可并行处理的不同线程中。如果变量是使用关键字 thread_local 声明的,则其生命周期与所属的线程一样长。

当声明一个变量 thread_local,那么每个 thread 都有自己的副本,举个例子:

#include <iostream>
#include <thread>

thread_local int i=0;

void f(int newval){
    i=newval;
}

void g(){
    std::cout<<i << std::endl;
}

void threadfunc(int id){
    f(id);
    ++i;
    g();
}

int main(){
    i=9;
    std::thread t1(threadfunc,1);
    std::thread t2(threadfunc,2);
    std::thread t3(threadfunc,3);

    t1.join();
    t2.join();
    t3.join();
    std::cout<<i<<std::endl;
}

输出结果为: 3 3 3 2 2 2 4 4 4 9 9 9(顺序不定)。


4. 从头文件 <thread> 开始

  • <thread>
    Header that declares the thread class and the this_thread namespace.
    这里说的是头文件 thread,其包含 类 thread命名空间 this_thread

4.1 线程的 5 种状态

  1. 初始化(INIT):该线程正在被创建-----首先申请一个空白的 TCB(Thread Control Block, 线程控制模块,控制着线程的运行和调度),并向 TCB 中填写一些控制和管理进程的信息;然后由系统为该进程分配运行时所必须的资源;最后把该进程转入到就绪状态。
    TCB 组成(线程TCB详解):
    1. threadID:线程的唯一标识;
    2. status:线程运行的状态;
    3. register:线程关于 CPU 中寄存器的情况;
    4. PC 程序计数器:线程执行的下一条指令的地址;
    5. 优先级:线程在操作系统调度的时候的优先级;
    6. 线程的专属存储区:线程单独额存储区域(C++管理数据内存的方式:自动存储、静态存储、动态存储、线程存储);
    7. 用户栈:线程执行的用户方法栈,用来保存线程当前执行的用户方法的信息;
    8. 内核栈:线程执行的内核方法栈,用来保存线程当前执行的内核方法信息。
  2. 就绪(READY):该线程在就绪列表中,等待 CPU 调度。
  3. 运行(RUNNING):该线程正在运行。
  4. 阻塞(BLOCKED):该线程被阻塞挂起。
    BLOCKED 状态包括:PEND(锁、事件、信号量等阻塞)、SUSPEND(主动 PEND)、DELAY(延时阻塞)、PENDTIME(因为锁、事件、信号量事件等超时等待)。
  5. 退出(EXIT):该线程运行结束,等待父线程回收其控制块资源。

5. 时间管理

5.1 C语言:time.h

long t0 = time(NULL);    // 获取从1970年1月1日到当前时经过的秒数
sleep(3);                        // 让程序休眠3秒
long t1 = t0 + 3;             // 当前时间的三秒后
usleep(3000000);          // 让程序休眠3000000微秒,也就是3秒

在 C语言中,使用 time(NULL) 获取当前时间,返回的是一个整数long。
其中,使用 sleep() 是让程序休息整数秒;而如果想让程序休息微秒,需使用 usleep()。
这样可以看出,C 语言原始的 API 内,没有类型区分,导致很容易弄错单位,混淆时间点时间段

5.2 C++11 时间标准库:std::chrono

因此,从 C++11 开始,就将时间标准化了,它利用 C++ 强类型的特点,明确区分时间点时间段,明确区分不同的时间单位

  • 时间点例子:2022年1月8日 13点07分10秒
  • 时间段例子:1分30秒
  • 时间点类型:chrono::steady_clock::time_point 等
  • 时间段类型:chrono::milliseconds,chrono::seconds,chrono::minutes 等
  • 方便的运算符重载:时间点+时间段=时间点,时间点-时间点=时间段

5.2.1 获取时间段 int64_t/double

auto t0 = chrono::steady_clock::now();// 获取当前时间点
auto t1 = t0 + chrono::seconds(30);// 当前时间点的30秒后
auto dt = t1 - t0;// 获取两个时间点的差(时间段)
int64_t sec = chrono::duration_cast<chrono::seconds>(dt).count();// 时间差的秒数

举个例子,计算一个步骤花费时间:

#include <iostream>
#include <chrono>

int main() {
    auto t0 = std::chrono::steady_clock::now();
    for (volatile int i = 0; i < 10000000; i++);
    auto t1 = std::chrono::steady_clock::now();
    auto dt = t1 - t0;
    int64_t ms = std::chrono::duration_cast<std::chrono::milliseconds>(dt).count();
    std::cout << "time elapsed: " << ms << " ms" << std::endl;
    return 0;
}

其中

typedef std::chrono::duration<int64_t, std::milli> std::chrono::milliseconds。

在例子中返回的是一个整数的毫秒数,如果想让精度超过毫秒,可以使用:

using double_ms = std::chrono::duration<double, std::milli>;

程序如下:

int main() {
    auto t0 = std::chrono::steady_clock::now();
    for (volatile int i = 0; i < 10000000; i++);
    auto t1 = std::chrono::steady_clock::now();
    auto dt = t1 - t0;
    using double_ms = std::chrono::duration<double, std::milli>;
    double ms = std::chrono::duration_cast<double_ms>(dt).count();
    std::cout << "time elapsed: " << ms << " ms" << std::endl;
    return 0;
}

原理为:
duration_cast 可以在任意的 duration 类型之间转换。
duration<T, R> 表示用 T 类型表示,且时间单位是 R
R 省略不写就是秒,std::milli 就是毫秒,std::micro 就是微秒
seconds 是 duration<int64_t> 的类型别名;milliseconds 是 duration<int64_t, std::milli> 的类型别名
这里我们创建了 double_ms 作为 duration<double, std::milli> 的别名

上面程序的结果可见下图所示,这样得到的结果就有小数点了。
在这里插入图片描述

5.2.2 this_thread

  • this_thread: this thread
    This namespace groups a set of functions that access the current thread.
    用于交互当前 thread。需要注意的是 this_thread 是命名空间(函数 get_idyield 定义在 thread 内,而 sleep_untilsleep_for在外)。

5.2.2.1 跨平台的 sleep: std::this_thread::sleep_for

  • std::this_thread::sleep_for: Sleep for time span
    Blocks execution of the calling thread during the span of time specified by rel_time.
    The execution of the current thread is stopped until at least rel_time has passed from now. Other threads continue their execution.

以前睡眠一段时间,不同操作系统使用不同的API。在 C++11 中可以用 std::this_thread::sleep_for 替代 Unix 类操作系统专有的 usleep。它可以让当前线程休眠一段时间,然后继续。(睡眠一个时间段)

这个 API 单位也可以自己指定,比如在下面示例中,使用 milliseconds 表示毫秒,也可以换成 microseconds 表示微秒,seconds 表示秒,chrono 的强类型让单位选择更自由。

int main() {
    std::this_thread::sleep_for(std::chrono::milliseconds(400));
    return 0;
}

这里就需要提到第四节说的线程的五种状态。线程与进程在使用时非常相似,在计算机中启动的多个线程都需要占用 CPU 资源,但是 CPU 的个数是有限的并且每个 CPU 在同一时间点不可能同时处理多个任务。为了能够实现并发处理,多个线程都是分时复用 CPU 时间片,快速的交替处理各个线程中的任务。因此多个线程之间需要争抢 CPU 时间片,抢到了就执行,抢不到则无法执行(因为默认所有的线程优先级都相同,内核也会从中调度,不会出现某个线程永远抢不到 CPU 时间片的情况)。

在调用 sleep_for 后,这个函数的线程会马上从运行态变成阻塞态并且在这种状态下休眠一定的时长,因为阻塞态的线程已经让出了 CPU 资源,代码也不会被执行,所以线程休眠过程中对 CPU 来说没有任何负担。

5.2.2.2 睡到时间点: std::this_thread::sleep_until

  • std::this_thread::sleep_until: Sleep until time point
    Blocks the calling thread until abs_time.
    The execution of the current thread is stopped until at least abs_time, while other threads may continue to advance.

除了接受一个时间段的 sleep_for,还有接受一个时间点的 sleep_until,表示当前线程休眠直到某个时间点。(睡眠到一个时间点)

在下面这个例程中,与 5.2.2.1 节中直接睡眠 400ms 是等价的。

int main() {
    auto t = std::chrono::steady_clock::now() + std::chrono::milliseconds(400);
    std::this_thread::sleep_until(t);
    return 0;
}

sleep_untilsleep_for 函数功能一样 ,前者基于时间点阻塞 ,后者基于时间段阻塞。

5.2.3 this_thread 内还有些什么(与时间管理无关)

5.2.3.1 get_id()

  • std::this_thread::get_id: Get thread id
    Returns the thread id of the calling thread. This value uniquely identifies the thread.

调用 get_id() 方法可以得到当前线程 ID。

5.2.3.2 yield()

  • std::this_thread::yield: Yield to other threads
    The calling thread yields, offering the implementation the opportunity to reschedule.
    This function shall be called when a thread waits for other threads to advance without blocking.

yield()主动由运行态退让出已经抢到的时间片,并转为就绪态,这样其他线程就能抢到 CPU 时间片。线程调用了 yield() 会主动放弃 CPU 资源,注意之后这个变为就绪态的线程会马上参与到下一轮 CPU 的抢夺战中,不排除它能继续抢到 CPU 时间片的情况。

比如以下例子:

void func() {
    for (size_t i = 0; i < 10000000; ++i){
        std::this_thread::yield();
        std::cout << "子线程ID:" << std::this_thread::get_id() << ",i = " << i << std::endl;
    }
}

int main() {
    std::cout << "主线程ID:" << std::this_thread::get_id() << std::endl;
    std::thread t1(func);
    std::thread t2(func);
    t1.join();
    t1.join();
    return 0;
}

func() 中的 for 循环会占用大量时间。在极端情况下,如果当前线程占用 CPU 资源不释放就会导致其他线程中的任务无法被处理,或者该线程每次都能抢到 CPU 时间片,导致其他线程中的任务没有机会被执行。解决方案就是每执行一次循环,让该线程主动放弃 CPU 资源,重新和其他线程再次抢夺 CPU 时间片,如果其他线程抢到了 CPU 时间片那么抢到时间片的线程就可以执行相应的任务了。

yield() 总结:

  1. yield() 的目的是避免一个线程长时间占用 CPU 资源,从而多线程处理能力下降;
  2. yield() 是让当前线程主动放弃自己抢到的 CPU 资源,但是在下一轮还会继续抢。

6. 线程

6.1 为什么需要多线程:无阻塞多任务

我们的一个独立程序常常需要同时处理多个任务。例如:后台在执行一个很耗时的任务,比如下载一个文件,同时还要和用户交互。这在 GUI 应用程序中很常见,比如浏览器在后台下载文件的同时,用户仍然可以用鼠标操作其 UI 界面。

现在来看下面例子:

#include <iostream>
#include <thread>
#include <string>

void download(std::string file) {
    for (int i = 0; i < 10; i++) {
        std::cout << "Downloading " << file
                  << " (" << i * 10 << "%)..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(400));
    }
    std::cout << "Download complete: " << file << std::endl;
}

void interact() {
    std::string name;
    std::cin >> name;
    std::cout << "Hi, " << name << std::endl;
}

int main() {
    download("hello.zip");
    interact();
    return 0;
}

在这个程序内,如果没有多线程,就必须等文件下载完了才能继续和用户Say Hi。下载完成前,整个界面都会处于“未响应”状态,用户想做别的事情就做不了。
在这里插入图片描述

6.2 现代 C++ 中的多线程:std::thread

老版本的 C 语言有 <pthread> 库,而从 C++11 开始,为多线程提供了语言级别的支持。他用 std::thread 这个类来表示线程。std::thread 构造函数的参数可以是任意 lambda 表达式。在这个线程启动时,就会执行 lambda 里的内容。这样就可以一边和用户交互,一边在另一个线程里慢吞吞下载文件了。

void download(std::string file) {
    for (int i = 0; i < 10; i++) {
        std::cout << "Downloading " << file
                  << " (" << i * 10 << "%)..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(400));
    }
    std::cout << "Download complete: " << file << std::endl;
}

void interact() {
    std::string name;
    std::cin >> name;
    std::cout << "Hi, " << name << std::endl;
}

int main() {
    std::thread t1([&] {
        download("hello.zip");
    });
    interact();
    return 0;
}

当直接编译这个代码时,会发现在链接时会出现问题。这是因为 std::thread 的实现背后是基于 pthread 的。需要在 CMakeLists.txt 里链接 Threads::Threads

cmake_minimum_required(VERSION 3.10)

set(CMAKE_CXX_STANDARD 17)

project(cpptest LANGUAGES CXX)

add_executable(cpptest main.cpp)

find_package(Threads REQUIRED)
target_link_libraries(cpptest PUBLIC Threads::Threads)

6.3 主线程等待子线程结束:t1.join()

  • std::thread::join: Join thread
    The function returns when the thread execution has completed.
    This synchronizes the moment this function returns with the completion of all the operations in the thread: This blocks the execution of the thread that calls this function until the function called on construction returns (if it hasn’t yet).
    After a call to this function, the thread object becomes non-joinable and can be destroyed safely.

现在已经有多线程了,文件下载用户交互分别在两个线程,同时独立运行。从而下载过程中也可以响应用户请求,提升了体验。

这时运行上面的程序,会发现一个问题:在输入完 ling 以后,程序的确及时地和我交互了。但是用户交互所在的主线程退出后,文件下载所在的子线程,因为从属于这个主线程,也被迫退出了。
在这里插入图片描述
因此,我们想要让主线程不要急着退出,等子线程也结束了再退出。可以用 std::thread 类的成员函数 join() 来等待刚刚创建的t1线程结束。

int main() {
    std::thread t1([&] {
        download("hello.zip");
    });
    interact();
    std::cout << "Waiting for child thread..." << std::endl;
    t1.join();
    std::cout << "Child thread exited!" << std::endl;
    return 0;
}

在子线程对象中调用 join() 函数,调用此函数的线程会被阻塞 ,但是子线程对象中的任务函数会继续执行 ,当任务执行完毕之后 join() 函数会清理当前子线程中的相关资源后返回,同时该线程函数会解除阻塞继续执行下去。函数在哪个线程中被执行,函数就阻塞那个函数。如果要阻塞主线程的执行,只需要在主线程中通过子线程对象调用这个方法即可,当调用这个方法的子线程对象中的任务函数执行完毕之后,主线程的阻塞也就随之解除了。

在上面的例子中,当主线程运行到 t1.join(),根据子线程对象 t1 的任务函数的执行情况,主线程会:

  1. 任务函数还没执行完毕,主线程阻塞直到任务执行完毕,主线程解除阻塞,继续向下执行;
  2. 任务函数已执行完毕,主线程不会阻塞,继续向下运行。

6.4 析构函数不再销毁线程:t1.detach()

  • std::thread::detach:Detach thread
    Detaches the thread represented by the object from the calling thread, allowing them to execute independently from each other.
    Both threads continue without blocking nor synchronizing in any way. Note that when either one ends execution, its resources are released.
    After a call to this function, the thread object becomes non-joinable and can be destroyed safely.

现在看下面这个例子:

void download(std::string file) {
    for (int i = 0; i < 10; i++) {
        std::cout << "Downloading " << file
                  << " (" << i * 10 << "%)..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(400));
    }
    std::cout << "Download complete: " << file << std::endl;
}

void interact() {
    std::string name;
    std::cin >> name;
    std::cout << "Hi, " << name << std::endl;
}

void myfunc() {
    std::thread t1([&] {
        download("hello.zip");
    });
    // 退出函数体时,会销毁 t1 线程的句柄!
}

int main() {
    myfunc();
    interact();
    return 0;
}

作为一个 C++ 类,std::thread 同样遵循 RAII 思想(RAII(Resource Acquisition Is Initialization),也称为“资源获取就是初始化”,是 C++ 等编程语言常用的管理资源、避免内存泄露的方法。它保证在任何情况下,使用对象时先构造对象,最后析构对象)和三五法则:因为管理着资源,它自定义了析构函数,删除了拷贝构造函数,但是提供了移动构造函数(举个例子,unique_ptr)。因此,当 t1 所在的函数 myfunc() 退出时,就会调用 std::thread 的析构函数,这会销毁 t1 线程。比如说在上面的例程中就会报以下错误:
在这里插入图片描述

这个时候就可以调用成员函数 detach() 分离该线程——意味着线程的生命周期不再由当前 std::thread 对象管理,而是在线程退出以后自动销毁自己

void myfunc() {
    std::thread t1([&] {
        download("hello.zip");
    });
    t1.detach();
    // t1 所代表的线程被分离了,不再随 t1 对象销毁
}

不过这样写还是有一个bug,就是之前说的,还是会在进程退出时候自动退出。(毕竟这里少了一个join())
在这里插入图片描述

6.5 析构函数不再销毁线程:移动到全局线程池

也就是说,detach() 的问题是进程退出时候不会等待所有子线程执行完毕。所以另一种解法是t1 对象移动到一个全局变量去,从而延长其生命周期到 myfunc() 函数体外。这样就可以等下载完再退出了。

void download(std::string file) {
    for (int i = 0; i < 10; i++) {
        std::cout << "Downloading " << file
                  << " (" << i * 10 << "%)..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(400));
    }
    std::cout << "Download complete: " << file << std::endl;
}

void interact() {
    std::string name;
    std::cin >> name;
    std::cout << "Hi, " << name << std::endl;
}

std::vector<std::thread> pool;

void myfunc() {
    std::thread t1([&] {
        download("hello.zip");
    });
    // 移交控制权到全局的 pool 列表,以延长 t1 的生命周期
    pool.push_back(std::move(t1));
}

int main() {
    myfunc();
    interact();
    for (auto &t: pool) t.join();  // 等待池里的线程全部执行完毕
    return 0;
}

但是需要在 main 里面手动 join 全部线程还是有点麻烦,我们可以自定义一个类 ThreadPool,并用它创建一个全局变量,其析构函数会在 main 退出后自动调用。(毕竟这里创建的是一个全局变量)

void download(std::string file) {
    for (int i = 0; i < 10; i++) {
        std::cout << "Downloading " << file
                  << " (" << i * 10 << "%)..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(400));
    }
    std::cout << "Download complete: " << file << std::endl;
}

void interact() {
    std::string name;
    std::cin >> name;
    std::cout << "Hi, " << name << std::endl;
}

// 这个写法很有意思,析构函数在释放的时候,等所有线程执行完毕才退出
class ThreadPool {
    std::vector<std::thread> m_pool;

public:
    void push_back(std::thread thr) {
        m_pool.push_back(std::move(thr));
    }

    ~ThreadPool() {                      // main 函数退出后会自动调用
        for (auto &t: m_pool) t.join();  // 等待池里的线程全部执行完毕
    }
};

ThreadPool tpool;

void myfunc() {
    std::thread t1([&] {
        download("hello.zip");
    });
    // 移交控制权到全局的 pool 列表,以延长 t1 的生命周期
    tpool.push_back(std::move(t1));
}

int main() {
    myfunc();
    interact();
    return 0;
}

6.6 C++20 std::jthread:符合 RAII 思想,析构时自动 join()

但是在上一节的内容中,还是需要自定义析构函数。C++20 引入了 std::jthread 类,和 std::thread 不同在于:它的析构函数里会自动调用 join() 函数,从而保证 pool 析构时会自动等待全部线程执行完毕。(注意 CMakeLists.txt 内 C++ 标准要修改为 set(CMAKE_CXX_STANDARD 20))

#include <iostream>
#include <thread>
#include <string>
#include <vector>

void download(std::string file) {
    for (int i = 0; i < 10; i++) {
        std::cout << "Downloading " << file
                  << " (" << i * 10 << "%)..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(400));
    }
    std::cout << "Download complete: " << file << std::endl;
}

void interact() {
    std::string name;
    std::cin >> name;
    std::cout << "Hi, " << name << std::endl;
}

// ~jthread() 解构函数里会自动调用 join(),如果 joinable() 的话
std::vector<std::jthread> pool;

void myfunc() {
    std::jthread t1([&] {
        download("hello.zip");
    });
    // 通过move移交控制权到全局的 pool 列表,以延长 t1 的生命周期(所以这个时候还没有析构)
    pool.push_back(std::move(t1)); //合理,就该通过move
}

int main() {
    myfunc();
    interact();
    return 0;
}

6.7 joinable()

  • std::thread::joinable:Check if joinable
    Returns whether the thread object is joinable.
    A thread object is joinable if it represents a thread of execution.
    A thread object is not joinable in any of these cases:
    • if it was default-constructed.(如下面例子中的 std::thread foo; thread 对象不允许默认构造函数)
    • if it has been moved from (either constructing another thread object, or assigning to it).
    • if either of its members join or detach has been called.(已经 join 了,那么肯定不能再次 join 了嘛)

就是字面意思,用于判断主线程和子线程是否处于关联状态,返回 true/false。

// example for thread::joinable
#include <iostream>       // std::cout
#include <thread>         // std::thread
 
void mythread() 
{
  // do stuff...
}
 
int main() 
{
  std::thread foo;
  std::thread bar(mythread);

  std::cout << "Joinable after construction:\n" << std::boolalpha;
  std::cout << "foo: " << foo.joinable() << '\n';
  std::cout << "bar: " << bar.joinable() << '\n';

  if (foo.joinable()) foo.join();
  if (bar.joinable()) bar.join();

  std::cout << "Joinable after joining:\n" << std::boolalpha;
  std::cout << "foo: " << foo.joinable() << '\n';
  std::cout << "bar: " << bar.joinable() << '\n';

  return 0;
}

返回结果为:

Joinable after construction:
foo: false
bar: true
Joinable after joining:
foo: false
bar: false ##都运行结束了肯定就不能join了

7. 异步

同步:如上面例子,下载完文件,才能和用户交互;
异步:下载文件的过程中,阻塞了,在等待网络请求,这时候将自动切换到和用户交互的线程上,用户体验将不会下降。

  • <future> 头文件
    Header with facilities that allow asynchronous access to values set by specific providers, possibly in a different thread.
    Each of these providers (which are either promise or packaged_task objects, or calls to async) share access to a shared state with a future object: the point where the provider makes the shared state ready is synchronized with the point the future object accesses the shared state.

  • std::future:A future is an object that can retrieve a value from some provider object or function, properly synchronizing this access if in different threads.

7.1 异步好帮手:std::async

  • std::async: Call function asynchronously
    The function temporarily stores in the shared state either the threading handler used or decay copies of fn and args (as a deferred function) without making it ready. Once the execution of fn is completed, the shared state contains the value returned by fn and is made ready.

  • std::future::get

std::async 接受一个带返回值的 lambda(相比于 thread,是一种更高层的异步方式。它不必显式手写 join 函数来让主线程等待子线程,可以用 std::future 类型的变量来接收线程函数运行的结果,并通过 get() 的方法来获得结果。这样就不必像 thread 那样提前定义一个全局变量,在线程函数中进行赋值操作),自身返回一个 std::future 对象。lambda 的函数体将在另一个线程里执行。比如下面这个例子:

#include <iostream>
#include <string>
#include <thread>
#include <future>

int download(std::string file) {
    for (int i = 0; i < 10; i++) {
        std::cout << "Downloading " << file
                  << " (" << i * 10 << "%)..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(400));
    }
    std::cout << "Download complete: " << file << std::endl;
    return 404;
}

void interact() {
    std::string name;
    std::cin >> name;
    std::cout << "Hi, " << name << std::endl;
}

int main() {
    std::future<int> fret = std::async([&] {
        return download("hello.zip"); 
    });
    interact();
    int ret = fret.get();
    std::cout << "Download result: " << ret << std::endl;
    return 0;
}

因为 download() 返回的是一个 int 类型,所以 std::async 返回的类型是 std::future<int>,future 代表这个 int 现在还没有,但是保证未来会有。
这时可以在 main() 里面做一些别的事情,download() 会持续在后台悄悄运行。最后调用 future 的 get() 方法,如果此时 download() 还没完成,会等待 download() 完成,并获取 download() 的返回值。
在调用 fret.get() 时,才有可能发生阻塞。(有点像 join,但是多返回了一个值)

7.2 显式地等待:wait()

  • std::future::wait:Wait for ready
    Waits for the shared state to be ready.
    If the shared state is not yet ready (i.e., the provider has not yet set its value or exception), the function blocks the calling thread and waits until it is ready.
    Once the shared state is ready, the function unblocks and returns without reading its value nor throwing its set exception (if any).
    All visible side effects are synchronized between the point the provider makes the shared state ready and the return of this function.

除了 get() 会等待线程执行完毕外,wait() 也可以等待它执行完,但是不会返回其值。(后面还是可以使用 get() 取值)

int main() {
    std::future<int> fret = std::async([&] {
        return download("hello.zip"); 
    });
    interact();
    std::cout << "Waiting for download complete..." << std::endl;
    fret.wait();
    std::cout << "Wait returned!" << std::endl;
    int ret = fret.get();
    std::cout << "Download result: " << ret << std::endl;
    return 0;
}

7.3 等待一段时间:wait_for()

  • std::future::wait_for:Wait for ready during time span
    Waits for the shared state to be ready for up to the time specified by rel_time.
    If the shared state is not yet ready (i.e., the provider has not yet set its value or exception), the function blocks the calling thread and waits until it is ready or until rel_time has elapsed, whichever happens first.
    When the function returns because its shared state is made ready, the value or exception set on the shared state is not read, but all visible side effects are synchronized between the point the provider makes the shared state ready and the return of this function.
    If the shared state contains a deferred function (such as future objects returned by async), the function does not block, returning immediately with a value of future_status::deferred.

wait() 存在一个问题,只要线程没有执行完,wait() 会无限等下去。而 wait_for() 则可以指定一个最长等待时间,用 chrono 里的类表示单位。他会返回一个 std::future_status 表示等待是否成功。如果超过这个时间线程还没有执行完毕,则放弃等待,返回 future_status::timeout;如果线程在指定的时间内执行完毕,则认为等待成功,返回 future_status::ready

比如下面这个例子中,在等待过程中会有输出:

int main() {
    std::future<int> fret = std::async([&] {
        return download("hello.zip"); 
    });
    interact();
    while (true) {
        std::cout << "Waiting for download complete..." << std::endl;
        auto stat = fret.wait_for(std::chrono::milliseconds(1000));
        if (stat == std::future_status::ready) {
            std::cout << "Future is ready!!" << std::endl;
            break;
        } else {
            std::cout << "Future not ready!!" << std::endl;
        }
    }
    int ret = fret.get();
    std::cout << "Download result: " << ret << std::endl;
    return 0;
}

在这里插入图片描述
同理还有 wait_until() 其参数是一个时间点。

7.4 另一种用法:std::launch::deferred 做参数

  • std::launch::deferred:Wait for ready during time span
    Deferred: The call to fn is deferred until the shared state of the returned future is accessed (with wait or get). At that point, fn is called and the function is no longer considered deferred. When this call returns, the shared state of the returned future is made ready.

刚刚说 std::async 会创建一个线程在后台执行,如果不想创建一个线程,可以考虑使用 std::launch::deferred 做参数。

std::async 的第一个参数可以设为 std::launch::deferred,这时不会创建一个线程来执行,它只会把 lambda 函数体内的运算推迟到 future 的 get() 被调用时。也就是 main 中的 interact() 计算完毕后。

这种写法,download 的执行仍在主线程中,它只是函数式编程范式意义上的异步,而不涉及到真正的多线程。(这不并行,但这很并发)可以用这个实现惰性求值(lazy evaluation)之类。

int main() {
    std::future<int> fret = std::async(std::launch::deferred, [&] {
        return download("hello.zip"); 
    });
    interact();
    int ret = fret.get();
    std::cout << "Download result: " << ret << std::endl;
    return 0;
}

7.5 std::async 的底层实现:std::promise

  • std::promise:Promise
    A promise is an object that can store a value of type T to be retrieved by a future object (possibly in another thread), offering a synchronization point.
    On construction, promise objects are associated to a new shared state on which they can store either a value of type T or an exception derived from std::exception.
    This shared state can be associated to a future object by calling member get_future. After the call, both objects share the same shared state:
    • The promise object is the asynchronous provider and is expected to set a value for the shared state at some point.
    • The future object is an asynchronous return object that can retrieve the value of the shared state, waiting for it to be ready, if necessary.
      The lifetime of the shared state lasts at least until the last object with which it is associated releases it or is destroyed. Therefore it can survive the promise object that obtained it in the first place if associated also to a future.
int main() {
    std::promise<int> pret;
    std::thread t1([&] {
        auto ret = download("hello.zip");
        pret.set_value(ret); 
    });
    std::future<int> fret = pret.get_future();

    interact();
    int ret = fret.get();
    std::cout << "Download result: " << ret << std::endl;

    t1.join();
    return 0;
}

如果不想让 std::async 帮你自动创建线程,想要手动创建线程,可以直接用 std::promise。然后在线程返回的时候,用 set_value() 设置返回值。在主线程里,用 get_future() 获取其 std::future 对象,进一步 get() 可以等待并获取线程返回值。这个原理跟 std::async 一样,不过 std::async 帮你包装好了。

7.6 std::future 小贴士

future 为了三五法则,删除了拷贝构造函数(同上)。如果需要浅拷贝,实现共享同一个 future 对象,可以用 std::shared_future
如果不需要返回值,std::async 里 lambda 的返回类型可以为 void, 这时 future 对象的类型为 std::future<void>
同理有 std::promise<void>,它的 set_value() 不接受参数,仅仅作为同步用,不传递任何实际的值。

#include <iostream>
#include <string>
#include <thread>
#include <future>

void download(std::string file) {
    for (int i = 0; i < 10; i++) {
        std::cout << "Downloading " << file
                  << " (" << i * 10 << "%)..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(400));
    }
    std::cout << "Download complete: " << file << std::endl;
}

void interact() {
    std::string name;
    std::cin >> name;
    std::cout << "Hi, " << name << std::endl;
}

int main() {
    //注意,使用的是shared_future
    std::shared_future<void> fret = std::async([&] {
        download("hello.zip"); 
    });
    auto fret2 = fret;
    auto fret3 = fret;
    interact();
    fret3.wait();
    std::cout << "Download completed" << std::endl;
    return 0;
}

8. 互斥量

  • <mutex> 头文件
    Header with facilities that allow mutual exclusion (mutex) of concurrent execution of critical sections of code, allowing to explicitly avoid data races.

多线程有个经典案例如下:

int main() {
	std::vector<int> arr;
	std::thread t1([&] {
		for (int i = 0; i < 1000; i++) {
			arr.push_back(1);
		}
	});
	std::thread t2([&] {
		for (int i = 0; i < 1000; i++) {
			arr.push_back(1);
		}
	});
	t1.join();
	t2.join();
	return 0;
}

两个线程试图往同一个数据里堆数据。运行程序的时候会发生崩溃。为什么?vector 不是多线程安全(MT-safe)的容器。多个线程同时访问同一个 vector 会出现数据竞争(data-race)现象

在这里插入图片描述

8.1 std::mutex:上锁,防止多个线程同时进入某一代码段

  • std::mutex:Mutex class
    A mutex is a lockable object that is designed to signal when critical sections of code need exclusive access, preventing other threads with the same protection from executing concurrently and access the same memory locations.
    mutex objects provide exclusive ownership and do not support recursivity (i.e., a thread shall not lock a mutex it already owns) – see recursive_mutex for an alternative class that does.
    It is guaranteed to be a standard-layout class.

调用 std::mutexlock() 时,会检测 mutex 是否已经上锁。如果没有锁定,则对 mutex 进行上锁。如果已经锁定则陷入等待,即被 lock() 阻塞,直到 mutex 被另一个线程解锁后,才再次上锁(这里很重要的一点是,它是会等待的)。而调用 unlock() 则会进行解锁操作。这样,就可以保证 mtx.lock() 和 mtx.unlock() 之间的代码段,同一时间只有一个线程在执行,从而避免数据竞争。(mutex 是个厕所,A 同学在里面,B 同学就不能进去,要等 A 同学用完了才能进去…)

#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
    std::vector<int> arr;
    std::mutex mtx;
    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            mtx.lock();
            arr.push_back(1);
            mtx.unlock();
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            mtx.lock();
            arr.push_back(2);
            mtx.unlock();
        }
    });
    t1.join();
    t2.join();
    return 0;
}

8.2 std::lock_guard:符合 RAII 思想的上锁和解锁

  • std::lock_guard:Lock guard
    A lock guard is an object that manages a mutex object by keeping it always locked.
    On construction, the mutex object is locked by the calling thread, and on destruction, the mutex is unlocked. It is the simplest lock, and is specially useful as an object with automatic duration that lasts until the end of its context. In this way, it guarantees the mutex object is properly unlocked in case an exception is thrown.
    Note though that the lock_guard object does not manage the lifetime of the mutex object in any way: the duration of the mutex object shall extend at least until the destruction of the lock_guard that locks it.

当然,在上面的情况下,要是忘记 unlock() 了,那么不就卡死在这了。

根据 RAII 思想,可将锁的持有视为资源,上锁视为锁的获取,解锁视为锁的释放。std::lock_guard 就是这样一个工具类,它的构造函数里会调用 mtx.lock(),析构函数会调用 mtx.unlock()从而退出函数作用域时能够自动解锁,避免程序员粗心不小心忘记解锁。(好想法!)

int main() {
    std::vector<int> arr;
    std::mutex mtx;
    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            std::lock_guard grd(mtx);
            arr.push_back(1);
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            std::lock_guard grd(mtx);
            arr.push_back(2);
        }
    });
    t1.join();
    t2.join();
    return 0;
}

8.3 std::unique_lock:也符合 RAII 思想,但自由度更高

  • std::unique_lock:Unique lock
    A unique lock is an object that manages a mutex object with unique ownership in both states: locked and unlocked.
    On construction (or by move-assigning to it), the object acquires a mutex object, for whose locking and unlocking operations becomes responsible.
    The object supports both states: locked and unlocked.
    This class guarantees an unlocked status on destruction (even if not called explicitly). Therefore it is especially useful as an object with automatic duration, as it guarantees the mutex object is properly unlocked in case an exception is thrown.
    Note though, that the unique_lock object does not manage the lifetime of the mutex object in any way: the duration of the mutex object shall extend at least until the destruction of the unique_lock that manages it.

lock_guard 还是存在一定的局限性,比如说必须在作用域结束才会释放,而不能提前释放。也就是说,std::lock_guard 严格在析构时 unlock(),但是有时候我们会希望提前 unlock()。这时可以用 std::unique_lock,它额外存储了一个 flag 表示是否已经被释放,会在解构检测这个 flag,如果没有释放,则调用 unlock(),否则不调用。然后可以直接调用 unique_lockunlock() 函数来提前解锁,但是即使忘记解锁也没关系,退出作用域时候他还会自动检查一遍要不要解锁。

int main() {
    std::vector<int> arr;
    std::mutex mtx;
    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            std::unique_lock grd(mtx);
            arr.push_back(1);
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            std::unique_lock grd(mtx);
            arr.push_back(2);
            grd.unlock();
            printf("outside of lock\n");
            // grd.lock();  // 如果需要,还可以重新上锁
        }
    });
    t1.join();
    t2.join();
    return 0;
}

8.3.1 std::unique_lock:用 std::defer_lock 作为参数

std::unique_lock 的构造函数还可以有一个额外参数,那就是 std::defer_lock指定了这个参数的话,std::unique_lock 不会在构造函数中调用 mtx.lock(),需要之后再手动调用 grd.lock() 才能上锁。好处依然是即使忘记 grd.unlock() 也能够自动调用 mtx.unlock()

int main() {
    std::vector<int> arr;
    std::mutex mtx;
    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            std::unique_lock grd(mtx);
            arr.push_back(1);
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            std::unique_lock grd(mtx, std::defer_lock);
            printf("before the lock\n");
            grd.lock(); // 手动上锁
            arr.push_back(2);
            grd.unlock();
            printf("outside of lock\n");
        }
    });
    t1.join();
    t2.join();
    return 0;
}

例程中开始 grd 是没有上锁的,在调用 grd.lock() 后,才上的锁。

可以看一下 std::defer_lock_t,是个空的类,其实就是为了做构造函数的重载。

8.4 多个对象?每个对象一个 mutex 即可

mtx1 用来锁定 arr1,mtx2 用来锁定 arr2。不同的对象,各有一个 mutex,独立地上锁,可以避免不必要的锁定,提升高并发时的性能。(即两个厕所两把锁,锁孔不同…)

用了一个 {} 包住 std::lock_guard,限制其变量的作用域,从而可以让它在 } 之前析构并调用 unlock(),即在确定的时间解锁,也避免了和下面一个 lock_guard 变量名冲突

int main() {
    std::vector<int> arr1;
    std::mutex mtx1;

    std::vector<int> arr2;
    std::mutex mtx2;

    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            {
                std::lock_guard grd(mtx1);
                arr1.push_back(1);
            }

            {
                std::lock_guard grd(mtx2);
                arr2.push_back(1);
            }
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            {
                std::lock_guard grd(mtx1);
                arr1.push_back(2);
            }

            {
                std::lock_guard grd(mtx2);
                arr2.push_back(2);
            }
        }
    });
    t1.join();
    t2.join();
    return 0;
}

8.5 如果上锁失败,不要等待:try_lock()

  • std::try_lock:Try to lock multiple mutexes
    Attempts to lock all the objects passed as arguments using their try_lock member functions (non-blocking).
    The function calls the try_lock member function for each argument (first a, then b, and eventually the others in cde, in the same order), until either all calls are successful, or as soon as one of the calls fails (either by returning false or throwing an exception).
    If the function ends because a call fails, unlock is called on all objects for which the call to try_lock was successful, and the function returns the argument order number of the object whose lock failed. No further calls are performed for the remaining objects in the argument list.

我们说过 lock() 如果发现 mutex 已经上锁的话,会等待它直到解锁。也可以用无阻塞的 try_lock(),他在上锁失败时不会陷入等待,而是直接返回 false;如果上锁成功,则会返回 true。比如下面这个例子,第一次上锁,因为还没有人上锁,所以成功了,返回 true。第二次上锁,由于自己已经上锁,所以失败了,返回 false。

#include <cstdio>
#include <mutex>

std::mutex mtx1;

int main() {
    if (mtx1.try_lock())
        printf("succeed\n");
    else
        printf("failed\n");

    if (mtx1.try_lock())
        printf("succeed\n");
    else
        printf("failed\n");

    mtx1.unlock();
    return 0;
}

(这里有点奇怪,我在使用该示例和官方示例的时候,try_lock不管怎样均返回true,或者连续lock两次也不会出现死锁的问题。可能是编译器做了优化?现在在主线程上,稍加修改输出就正常了。怀疑是 gcc 版本较低导致 <=9,换台电脑输出也是正常的 gcc=11)

#include <cstdio>
#include <mutex>
#include <thread>
#include <vector>

std::mutex mtx1;

int main() {
    std::vector<int> arr1;
    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            {
                std::lock_guard grd(mtx1);
                arr1.push_back(1);
            }
        }
    });
    if (mtx1.try_lock())
        printf("succeed\n");
    else
        printf("failed\n");

    if (mtx1.try_lock())
        printf("succeed\n");
    else
        printf("failed\n");

    mtx1.unlock();
    t1.join();
    return 0;
}

8.6 只等待一段时间:try_lock_for()

try_lock() 碰到已经上锁的情况,会立即返回 false。如果需要等待,但仅限一段时间,可以用 std::timed_mutextry_lock_for() 函数,它的参数是最长等待时间,同样是由 chrono 指定时间单位。超过这个时间还没成功就会“不耐烦地”失败并返回 false;如果这个时间内上锁成功则返回 true。同理还有接受时间点的 try_lock_until()

std::timed_mutex mtx1;

int main() {
    if (mtx1.try_lock_for(std::chrono::milliseconds(500)))
        printf("succeed\n");
    else
        printf("failed\n");

    if (mtx1.try_lock_for(std::chrono::milliseconds(500)))
        printf("succeed\n");
    else
        printf("failed\n");

    mtx1.unlock();
    return 0;
}

8.7 std::unique_lock补充

8.7.1 用 std::try_to_lock 做参数(即作为std::unique_lock的参数)

和无参数相比,它会调用 mtx1.try_lock() 而不是 mtx1.lock()。之后,可以用 grd.owns_lock() 判断是否上锁成功。

#include <cstdio>
#include <thread>
#include <mutex>

int main() {
    std::mutex mtx;
    std::thread t1([&] {
        std::unique_lock grd(mtx, std::try_to_lock);
        if (grd.owns_lock())
            printf("t1 success\n");
        else
            printf("t1 failed\n");
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    });

    std::thread t2([&] {
        std::unique_lock grd(mtx, std::try_to_lock);
        if (grd.owns_lock())
            printf("t2 success\n");
        else
            printf("t2 failed\n");
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    });

    t1.join();
    t2.join();
    return 0;
}

8.7.2 用 std::adopt_lock 做参数

std::defer_lock 是之后再上锁,而 std::adopt_lock 是之前已经上锁了,再告诉它(如果没上锁,则上锁?)。
如果当前 mutex 已经上锁了,但是之后仍然希望用 RAII 思想在析构时候自动调用 unlock(),可以用 std::adopt_lock 作为 std::unique_lock 或 std::lock_guard 的第二个参数,这时它们会默认 mtx 已经上锁。

int main() {
    std::mutex mtx;
    std::thread t1([&] {
        std::unique_lock grd(mtx);
        printf("t1 owns the lock\n");
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    });

    std::thread t2([&] {
        mtx.lock();
        std::unique_lock grd(mtx, std::adopt_lock);
        printf("t2 owns the lock\n");
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    });

    t1.join();
    t2.join();
    return 0;
}

如上述代码所示,在使用 std::unique_lock grd(mtx, std::adopt_lock) 前,已使用 mtx.lock() 上了锁。而在这里因为后面不想 unlock,这里这样使用就可以自动调用 unique_lock 的析构函数来解锁了。

顺带一提,std::try_to_lockstd::adopt_lock 也都是空的,目的就是做构造函数重载。

8.8 std::unique_lock 和 std::mutex 具有同样的接口

std::unique_lock 本身还可以再调用 std::lock_guard
其实 std::unique_lock 具有 mutex 的所有成员函数:lock(), unlock(), try_lock(), try_lock_for() 等。除了它会在析构时按需自动调用 unlock()。因为 std::lock_guard 无非是调用其构造参数名为 lock() 的成员函数,所以 std::unique_lock 也可以作为 std::lock_guard 的构造参数!这种只要具有某些指定名字的成员函数,就判断一个类是否满足某些功能的思想,在 Python 称为鸭子类型,而 C++ 称为 concept(概念)。比起虚函数和动态多态的接口抽象,concept 使实现和接口更加解耦合且没有性能损失。(这里是移动构造了?如果在 std::lock_guard grd2(grd) 下面加一个 grd.unlock(),程序会崩)

#include <cstdio>
#include <thread>
#include <mutex>

int main() {
    std::mutex mtx;
    std::thread t1([&] {
        std::unique_lock grd(mtx, std::defer_lock);
        std::lock_guard grd2(grd);
        printf("t1 owns the lock\n");
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    });

    std::thread t2([&] {
        std::unique_lock grd(mtx, std::defer_lock);
        std::lock_guard grd2(grd);
        printf("t2 owns the lock\n");
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    });

    t1.join();
    t2.join();
    return 0;
}

9. 死锁

由于同时执行的两个线程,他们中发生的指令不一定是同步的,因此有可能出现这种情况:

  • t1 执行 mtx1.lock()。
  • t2 执行 mtx2.lock()。
  • t1 执行 mtx2.lock():失败,陷入等待
  • t2 执行 mtx1.lock():失败,陷入等待

双方都在等着对方释放锁,但是因为等待而无法释放锁,从而要无限制等下去。这种现象称为死锁(dead-lock)。(同时锁住多个 mutex)

#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
    std::mutex mtx1;
    std::mutex mtx2;

    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            mtx1.lock();
            mtx2.lock();
            mtx2.unlock();
            mtx1.unlock();
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            mtx2.lock();
            mtx1.lock();
            mtx1.unlock();
            mtx2.unlock();
        }
    });
    t1.join();
    t2.join();
    return 0;
}

9.1 解决方案

9.1.1 永远不要同时持有两个锁

最为简单的方法,就是一个线程永远不要同时持有两个锁,分别上锁,这样就可以避免死锁。因此这里双方都在 mtx1.unlock() 之后才 mtx2.lock(),从而也不会出现一方等着对方的同时持有了对方等着的锁的情况。

int main() {
    std::mutex mtx1;
    std::mutex mtx2;

    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            mtx1.lock();
            mtx1.unlock();
            mtx2.lock();
            mtx2.unlock();
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            mtx2.lock();
            mtx2.unlock();
            mtx1.lock();
            mtx1.unlock();
        }
    });
    t1.join();
    t2.join();
    return 0;
}

9.1.2 保证双方上锁顺序一致

其实,只需保证双方上锁的顺序一致,即可避免死锁。因此这里调整 t2 也变为先锁 mtx1,再锁 mtx2。这时,无论实际执行顺序是怎样,都不会出现一方等着对方的同时持有了对方等着的锁的情况。

int main() {
    std::mutex mtx1;
    std::mutex mtx2;

    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            mtx1.lock();
            mtx2.lock();
            mtx2.unlock();
            mtx1.unlock();
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            mtx1.lock();
            mtx2.lock();
            mtx2.unlock();
            mtx1.unlock();
        }
    });
    t1.join();
    t2.join();
    return 0;
}

9.1.3 用 std::lock 同时对多个上锁

如果没办法保证上锁顺序一致,可以用标准库的 std::lock(mtx1, mtx2, ...) 函数,一次性对多个 mutex 上锁。它接受任意多个 mutex 作为参数,并且它保证在无论任意线程中调用的顺序是否相同,都不会产生死锁问题

int main() {
    std::mutex mtx1;
    std::mutex mtx2;

    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            std::lock(mtx1, mtx2);
            mtx1.unlock();
            mtx2.unlock();
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            std::lock(mtx2, mtx1);
            mtx2.unlock();
            mtx1.unlock();
        }
    });
    t1.join();
    t2.join();
    return 0;
}

9.1.4 std::lock 的 RAII 版本:std::scoped_lock

和 std::lock_guard 相对应,std::lock 也有 RAII 的版本 std::scoped_lock。只不过它可以同时对多个 mutex 上锁。(C++17 里对于 lock_guard 的升级,可以一口气 lock 任意个 mutex,保证不会死锁)

int main() {
    std::mutex mtx1;
    std::mutex mtx2;

    std::thread t1([&] {
        for (int i = 0; i < 1000; i++) {
            std::scoped_lock grd(mtx1, mtx2); // 同时上了两个锁
            // do something
        }
    });
    std::thread t2([&] {
        for (int i = 0; i < 1000; i++) {
            std::scoped_lock grd(mtx2, mtx1);
            // do something
        }
    });
    t1.join();
    t2.join();
    return 0;
}

9.2 同一个线程重复调用 lock() 也会造成死锁

除了两个线程同时持有两个锁会造成死锁外,即使只有一个线程一个锁,如果 lock() 以后又调用 lock(),也会造成死锁。比如下面示例的 func 函数,上了锁之后,又调用了 other 函数,它也需要上锁。而 other 看到 mtx1 已经上锁,还以为是别的线程上的锁,于是陷入等待。殊不知是调用他的 func 上的锁,other 陷入等待后 func 里的 unlock() 永远得不到调用。

#include <iostream>
#include <mutex>

std::mutex mtx1;

void other() {
    mtx1.lock();
    // do something
    mtx1.unlock();
}

void func() {
    mtx1.lock();
    other();
    mtx1.unlock();
}

int main() {
    func();
    return 0;
}

9.2.1 解决1:other 里不要再上锁

遇到这种情况最好是把 other 里的 lock() 去掉,并在其文档中说明:“other 不是线程安全的,调用本函数之前需要保证某 mutex 已经上锁。”(就是写个文档提醒自己小心)

std::mutex mtx1;

/// NOTE: please lock mtx1 before calling other()
void other() {
    // do something
}

void func() {
    mtx1.lock();
    other();
    mtx1.unlock();
}

int main() {
    func();
    return 0;
}

9.2.2 改用 std::recursive_mutex

如果实在不能改的话,可以用 std::recursive_mutex。它会自动判断是不是同一个线程 lock() 了多次同一个锁,如果是则让计数器加 1 1 1,之后 unlock() 会让计数器减 1 1 1,减到 0 0 0 时才真正解锁。但是相比普通的 std::mutex 有一定性能损失。同理还有 std::recursive_timed_mutex,如果你同时需要 try_lock_for() 的话。

std::recursive_mutex mtx1;

void other() {
    mtx1.lock();
    // do something
    mtx1.unlock();
}

void func() {
    mtx1.lock();
    other();
    mtx1.unlock();
}

int main() {
    func();
    return 0;
}

10. 数据结构

10.1 封装一个线程安全的 vector

在前面章节说过,vector 不是多线程安全的容器。多个线程同时访问同一个 vector 会出现数据竞争(data-race)现象。

因此,可以用一个类封装一下对 vector 的访问,使其访问都受到一个 mutex 的保护

class MTVector {
    std::vector<int> m_arr;
    std::mutex m_mtx;

public:
    void push_back(int val) {
        m_mtx.lock();
        m_arr.push_back(val);
        m_mtx.unlock();
    }

    size_t size() const {
        m_mtx.lock();
        size_t ret = m_arr.size();
        m_mtx.unlock();
        return ret;
    }
};

int main() {
    MTVector arr;

    std::thread t1([&] () {
        for (int i = 0; i < 1000; i++) {
            arr.push_back(i);
        }
    });

    std::thread t2([&] () {
        for (int i = 0; i < 1000; i++) {
            arr.push_back(1000 + i);
        }
    });

    t1.join();
    t2.join();

    std::cout << arr.size() << std::endl;

    return 0;
}

然而却出错了:因为 size() 是 const 函数,而 mutex::lock() 却不是 const 的。(这里将size()后面的 const 去掉就可以编译了,但写成size_t size() const{}肯定正规多了,毕竟原来的vector就是这样定义的)
在这里插入图片描述

10.1.1 逻辑上 const 而部分成员非 const:mutable

我们要为了支持 mutex 而放弃声明 size() 为 const 吗?不必,size() 在逻辑上仍是 const 的。因此,为了让 this 为 const 时仅仅给 m_mtx 开后门,可以用 mutable 关键字修饰它,从而所有成员里只有它不是 const 的。(就是多加了一个 mutable 关键字)

class MTVector {
    std::vector<int> m_arr;
    mutable std::mutex m_mtx;

public:
    void push_back(int val) {
        m_mtx.lock();
        m_arr.push_back(val);
        m_mtx.unlock();
    }

    size_t size() const {
        m_mtx.lock();
        size_t ret = m_arr.size();
        m_mtx.unlock();
        return ret;
    }
};

10.2 为什么需要读写锁?

之前说过 mutex 就像厕所,同一时刻只有一个人能上。但是如果“上”有两种方式呢?假设在平行世界,厕所不一定是用来拉的,还可能是用来喝的,喝厕所里的水时,可以多个人插着吸管一起喝。而拉的时候,只能一个人独占厕所,不能多个人一起往里面拉。
喝水的人如果发现厕所里已经有人在拉,那他也不能去喝,否则会喝到“脏数据”。

  • 结论:可以共享必须独占,且写和读不能共存
    针对这种更具体的情况,又发明了读写锁,它允许的状态有:
    1. n个人读取,没有人写入。
    2. 1个人写入,没有人读取。
    3. 没有人读取,也没有人写入。

10.2.1 读写锁:shared_mutex

为此,C++17 提供了 std::shared_mutex
上锁时,要指定你的需求是拉还是喝,负责调度的读写锁会帮你判断要不要等待
这里 push_back() 需要修改数据,因此需求为,使用 lock()unlock() 的组合。
而 size() 则只要读取数据,不修改数据,因此可以和别人共享一起,使用 lock_shared()unlock_shared() 的组合。

#include <iostream>
#include <thread>
#include <vector>
#include <shared_mutex>

class MTVector {
    std::vector<int> m_arr;
    mutable std::shared_mutex m_mtx;

public:
    void push_back(int val) {
        m_mtx.lock();
        m_arr.push_back(val);
        m_mtx.unlock();
    }

    size_t size() const {
        m_mtx.lock_shared();
        size_t ret = m_arr.size();
        m_mtx.unlock_shared();
        return ret;
    }
};

int main() {
    MTVector arr;

    std::thread t1([&] () {
        for (int i = 0; i < 1000; i++) {
            arr.push_back(i);
        }
    });

    std::thread t2([&] () {
        for (int i = 0; i < 1000; i++) {
            arr.push_back(1000 + i);
        }
    });

    t1.join();
    t2.join();

    std::cout << arr.size() << std::endl;

    return 0;
}

10.2.2 std::shared_lock:符合 RAII 思想的 lock_shared()

正如 std::unique_lock 针对 lock(),也可以用 std::shared_lock 针对 lock_shared()。这样就可以在函数体退出时自动调用 unlock_shared(),更加安全了。
shared_lock 同样支持 defer_lock 做参数,owns_lock() 判断等。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <shared_mutex>

class MTVector {
    std::vector<int> m_arr;
    mutable std::shared_mutex m_mtx;

public:
    void push_back(int val) {
        std::unique_lock grd(m_mtx);
        m_arr.push_back(val);
    }

    size_t size() const {
        std::shared_lock grd(m_mtx);
        return m_arr.size();
    }
};

int main() {
    MTVector arr;

    std::thread t1([&] () {
        for (int i = 0; i < 1000; i++) {
            arr.push_back(i);
        }
    });

    std::thread t2([&] () {
        for (int i = 0; i < 1000; i++) {
            arr.push_back(1000 + i);
        }
    });

    t1.join();
    t2.join();

    std::cout << arr.size() << std::endl;

    return 0;
}

10.2.3 只需一次性上锁,且符合 RAII 思想:访问者模式

再来看下面一个例子,MTVector为用来存储数据的类,而Accessor是用来访问数据的类,它们被区分了开来。这里的一个原因就是使用锁std::unique_lock<std::mutex> m_guard;。比如前面一小节中,我们使用std::unique_lock grd(m_mtx);std::shared_lock grd(m_mtx);在每一次循环中上锁和解锁,这样非常低效,而用访问者就可以进行一次性上锁

如何做到一次性上锁呢?因为 Accessor 里面有一个unique_lock类型,就是 Accessor 初始化的时候,会先锁住这个mutex。然后在 Accessor axr 析构的时候,才会去把这个锁解锁掉。也就是说这里面只定义了一次上锁一次解锁,从而它就合并了多次上锁(这里有个push_back的loop调用,不需要每一个push_back上个锁再解锁),对性能有帮助,且能够分离实际对象的存储与访问-------存储是外面一个类,而访问是里面额外的一个类。可以通过access来获取一个存储对象的访问者类。

特别是在 GPU 上,这个访问者模式就很重要,因为没办法把一个 vector 拷到 GPU 上。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

class MTVector {
    std::vector<int> m_arr;
    std::mutex m_mtx;

public:
    class Accessor {
        MTVector &m_that;
        std::unique_lock<std::mutex> m_guard;

    public:
        Accessor(MTVector &that)
            : m_that(that), m_guard(that.m_mtx)
        {}

        void push_back(int val) const {
            return m_that.m_arr.push_back(val);
        }

        size_t size() const {
            return m_that.m_arr.size();
        }
    };

    Accessor access() {
        return {*this};
    }
};

int main() {
    MTVector arr;

    std::thread t1([&] () {
        auto axr = arr.access();
        std::cout << "t1 start.\n";
        for (int i = 0; i < 1000; i++) {
            axr.push_back(i);
        }
        std::cout << "t1 end.\n";
    });

    std::thread t2([&] () {
        auto axr = arr.access();
        std::cout << "t2 start.\n";
        for (int i = 0; i < 1000; i++) {
            axr.push_back(1000 + i);
        }
        std::cout << "t2 end.\n";
    });

    t1.join();
    t2.join();

    std::cout << arr.access().size() << std::endl;

    return 0;
}

这里的打印结果为:

t1 start.    | t2 start.
t1 end.      | t2 end.
t2 start.    | t1 start.
t2 end.      | t1 end.
2000         | 2000

这是因为在 t1 或 t2 在 access() 时,创建了一个 Accessor 将线程锁了起来,仅供 t1 的操作,在该线程结束时析构,锁被解掉,另一线程即可使用。(但这里和前面示例是有区别的,前面的示例在于交错着往 vector 存东西,这里有一个线程在另一线程结束前会一直阻塞)

Accessor 或者说 Viewer 模式,常用于设计 GPU 容器 OpenVDB 数据结构的访问,也可以采用 Accessor 的设计……并且还有 ConstAccessor 和 Accessor 两种,分别对应于读和写。

11. 条件变量

前面说的互斥量防止多个线程,同时访问一个数据;而条件变量,更像是一种信号量之类的东西-----只有某个事件发生了之后,这个线程才能继续执行

这是 C++ 提供的另一种用于等待的同步机制,能阻塞一个或多个线程,知道收到另一个线程发出的通知或超时时,才能唤醒当前阻塞的线程。条件变量需要和互斥量配合使用。

  • std::condition_variable:A condition variable is an object able to block the calling thread until notified to resume.
    It uses a unique_lock (over a mutex) to lock the thread when one of its wait functions is called. The thread remains blocked until woken up by another thread that calls a notification function on the same condition_variable object.

11.1 条件变量:等待被唤醒

  • std::condition_variable::wait:Wait until notified
    The execution of the current thread (which shall have locked lck’s mutex) is blocked until notified.
    At the moment of blocking the thread, the function automatically calls lck.unlock(), allowing other locked threads to continue.
    Once notified (explicitly, by some other thread), the function unblocks and calls lck.lock(), leaving lck in the same state as when the function was called. Then the function returns (notice that this last mutex locking may block again the thread before returning).
    Generally, the function is notified to wake up by a call in another thread either to member notify_one or to member notify_all. But certain implementations may produce spurious wake-up calls without any of these functions being called. Therefore, users of this function shall ensure their condition for resumption is met.

  • std::condition_variable::notify_one:Notify one
    Unblocks one of the threads currently waiting for this condition.
    If no threads are waiting, the function does nothing.
    If more than one, it is unspecified which of the threads is selected.

cv.wait(lck) 将会让当前线程陷入等待
其他线程中调用 cv.notify_one() 则会唤醒那个陷入等待的线程
可以发现 std::condition_variable 必须和 std::unique_lock<std::mutex> 一起用。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

int main() {
    std::condition_variable cv;
    std::mutex mtx;

    std::thread t1([&] {
        std::unique_lock lck(mtx);
        cv.wait(lck);

        std::cout << "t1 is awake" << std::endl;
    });

    std::this_thread::sleep_for(std::chrono::milliseconds(400));

    std::cout << "notifying..." << std::endl;
    cv.notify_one();  // will awake t1

    t1.join();

    return 0;
}

输出为:
在这里插入图片描述
这里 std::this_thread::sleep_for() 的作用时,如果 cv.notify_one() 先运行了,这时 wait 并不会收到这个,这个线程就一直阻塞着了。

11.2 条件变量:等待某一条件成真

还可以额外指定一个参数,变成 cv.wait(lck, expr) 的形式,其中 expr 是个 lambda 表达式只有其返回值为 true 时才会真正唤醒,否则继续等待。(这操作有点太骚了。。)

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

int main() {
    std::condition_variable cv;
    std::mutex mtx;
    bool ready = false;

    std::thread t1([&] {
        std::unique_lock lck(mtx);
        cv.wait(lck, [&] { return ready; });

        std::cout << "t1 is awake" << std::endl;
    });

    std::cout << "notifying not ready" << std::endl;
    cv.notify_one();  // useless now, since ready = false

    ready = true;
    std::cout << "notifying ready" << std::endl;
    cv.notify_one();  // awakening t1, since ready = true

    t1.join();

    return 0;
}

输出为:
在这里插入图片描述

11.3 条件变量:多个等待者

cv.notify_one() 只会唤醒其中一个等待中的线程(经过测试感觉像是随机的一个),而 cv.notify_all() 会唤醒全部。这就是为什么 wait() 需要一个 unique_lock 作为参数,因为要保证多个线程被唤醒时,只有一个能够被启动。如果不需要,在 wait() 返回后调用 lck.unlock() 即可。顺便一提,wait() 的过程中会暂时 unlock() 这个锁。

int main() {
    std::condition_variable cv;
    std::mutex mtx;

    std::thread t1([&] {
        std::unique_lock lck(mtx);
        cv.wait(lck);
        std::cout << "t1 is awake" << std::endl;
    });

    std::thread t2([&] {
        std::unique_lock lck(mtx);
        cv.wait(lck);
        std::cout << "t2 is awake" << std::endl;
    });

    std::thread t3([&] {
        std::unique_lock lck(mtx);
        cv.wait(lck);
        std::cout << "t3 is awake" << std::endl;
    });

    std::this_thread::sleep_for(std::chrono::milliseconds(400));

    std::cout << "notifying one" << std::endl;
    cv.notify_one();  // awakening t1 only

    std::this_thread::sleep_for(std::chrono::milliseconds(400));

    std::cout << "notifying all" << std::endl;
    cv.notify_all();  // awakening t1 and t2

    t1.join();
    t2.join();
    t3.join();

    return 0;
}

输出结果为:
在这里插入图片描述

11.3.1 案例:实现生产者-消费者模式

下面这个例子类似于消息队列:
生产者:厨师,往 foods 队列里推送食品,推送后会通知消费者来用餐。
消费者:等待 foods 队列里有食品,没有食品则陷入等待,直到被通知。

int main() {
    std::condition_variable cv;
    std::mutex mtx;

    std::vector<int> foods;

    std::thread t1([&] {
        for (int i = 0; i < 2; i++) {
            std::unique_lock lck(mtx);
            cv.wait(lck, [&] {
                return foods.size() != 0;
            });
            auto food = foods.back();
            foods.pop_back();
            lck.unlock();

            std::cout << "t1 got food:" << food << std::endl;
        }
    });

    std::thread t2([&] {
        for (int i = 0; i < 2; i++) {
            std::unique_lock lck(mtx);
            cv.wait(lck, [&] {
                return foods.size() != 0;
            });
            auto food = foods.back();
            foods.pop_back();
            lck.unlock();

            std::cout << "t2 got food:" << food << std::endl;
        }
    });

    foods.push_back(42);
    foods.push_back(233);
    cv.notify_one();

    foods.push_back(666);
    foods.push_back(4399);
    cv.notify_all();

    t1.join();
    t2.join();

    return 0;
}

结果为(感觉这例子不太对劲,这样菜会上的乱七八糟的。且输出因为线程顺序,每次打印的不太一样):
在这里插入图片描述

11.3.2 条件变量:将 foods 队列封装成类

template <class T>
class MTQueue {
    std::condition_variable m_cv;
    std::mutex m_mtx;
    std::vector<T> m_arr;

public:
    T pop() {
        std::unique_lock lck(m_mtx);
        m_cv.wait(lck, [this] { return !m_arr.empty(); });
        T ret = std::move(m_arr.back());
        m_arr.pop_back();
        return ret;
    }

    auto pop_hold() {
        std::unique_lock lck(m_mtx);
        m_cv.wait(lck, [this] { return !m_arr.empty(); });
        T ret = std::move(m_arr.back());
        m_arr.pop_back();
        return std::pair(std::move(ret), std::move(lck));
    }

    void push(T val) {
        std::unique_lock lck(m_mtx);
        m_arr.push_back(std::move(val));
        m_cv.notify_one();
    }

    void push_many(std::initializer_list<T> vals) {
        std::unique_lock lck(m_mtx);
        std::copy(
                 std::move_iterator(vals.begin()),
                 std::move_iterator(vals.end()),
                 std::back_insert_iterator(m_arr));
        m_cv.notify_all();
    }
};

int main() {
    MTQueue<int> foods;

    std::thread t1([&] {
        for (int i = 0; i < 2; i++) {
            auto food = foods.pop();
            std::cout << "t1 got food:" << food << std::endl;
        }
    });

    std::thread t2([&] {
        for (int i = 0; i < 2; i++) {
            auto food = foods.pop();
            std::cout << "t2 got food:" << food << std::endl;
        }
    });

    foods.push(42);
    foods.push(233);
    foods.push_many({666, 4399});

    t1.join();
    t2.join();

    return 0;
}

输出结果为:
在这里插入图片描述

11.4 注意事项

  1. std::condition_variable 仅仅支持 std::unique_lock<std::mutex> 作为 wait 的参数,如果需要用其他类型的 mutex 锁,可以用 std::condition_variable_any
  2. 它还有 wait_for()wait_until() 函数,分别接受 chrono 时间段和时间点作为参数。详见:https://en.cppreference.com/w/cpp/thread/condition_variable/wait_for

12. 原子操作

  • <atomic> 头文件
    Atomic types are types that encapsulate a value whose access is guaranteed to not cause data races and can be used to synchronize memory accesses among different threads.

前面都是接近 <pthread> 里的对象,当然我们也可以直接从硬件层面上去操作,这样子更高效。而且操作系统提供的 mutex,也是基于硬件层面实现的。

12.1 经典案例:多个线程修改同一个计数器

来看下面这个示例:

int main() {
    int counter = 0;

    std::thread t1([&] {
        for (int i = 0; i < 10000; i++) {
            counter += 1;
        }
    });

    std::thread t2([&] {
        for (int i = 0; i < 10000; i++) {
            counter += 1;
        }
    });

    t1.join();
    t2.join();

    std::cout << counter << std::endl;

    return 0;
}

counter += 1 看着像是一条独立指令,但多个线程同时往一个 int 变量里累加,这样肯定会出错,因为 counter += i 在 CPU 看来会变成三个指令:

  1. 读取 counter 变量到 rax 寄存器;
  2. rax 寄存器的值加上 1;
  3. 把 rax 写入到 counter 变量。

即使编译器优化成 add [counter], 1 也没用,因为现代 CPU 为了高效,使用了大量奇技淫巧,比如他会把一条汇编指令拆分成很多微指令 (micro-ops),三个甚至有点保守估计了。

问题是,如果有多个线程同时运行,顺序是不确定的:

  1. t1:读取 counter 变量,到 rax 寄存器
  2. t2:读取 counter 变量,到 rax 寄存器
  3. t1:rax 寄存器的值加上 1
  4. t2:rax 寄存器的值加上 1
  5. t1:把 rax 写入到 counter 变量
  6. t2:把 rax 写入到 counter 变量

如果是这种顺序,最后 t1 的写入就被 t2 覆盖了,从而 counter 只增加了 1,而没有像预期的那样增加 2。更不用说现代 CPU 还有高速缓存,乱序执行,指令级并行等优化策略,你根本不知道每条指令实际的先后顺序。

12.2 暴力解决:用 mutex 上锁

所以说,最暴力的解决办法就是使用 mutex 上锁。这样的确可以防止多个线程同时修改 counter 变量,从而不会冲突。

std::thread t1([&] {
    for (int i = 0; i < 10000; i++) {
        mtx.lock();
        counter += 1;
        mtx.unlock();
    }
});

std::thread t2([&] {
    for (int i = 0; i < 10000; i++) {
        mtx.lock();
        counter += 1;
        mtx.unlock();
    }
});

问题:mutex 太过重量级,它会让线程被挂起,从而需要通过系统调用,进入内核层,调度到其他线程执行,有很大的开销。(mutex是操作系统来维护的,要使用操作系统上锁,需要先进入到内核态,再进入到用户态,甚至要切换到另一个线程)

12.3 建议用 atomic:有专门的硬件指令加持

  • std::atomic: Atomic
    Objects of atomic types contain a value of a particular type (T).
    The main characteristic of atomic objects is that access to this contained value from different threads cannot cause data races (i.e., doing that is well-defined behavior, with accesses properly sequenced). Generally, for all other objects, the possibility of causing a data race for accessing the same object concurrently qualifies the operation as undefined behavior.
    Additionally, atomic objects have the ability to synchronize access to other non-atomic objects in their threads by specifying different memory orders.

这个时候就引出了更轻量级的 atomic,对它的 += 等操作,会被编译器转换成专门的指令。

CPU 识别到该指令时,会锁住内存总线,放弃乱序执行等优化策略(将该指令视为一个同步点,强制同步掉之前所有的内存操作),从而向你保证该操作是原子 (atomic)(取其不可分割之意),不会加法加到一半另一个线程插一脚进来。

对于程序员,只需把 int 改成 atomic 即可,也不必像 mutex 那样需要手动上锁解锁,因此用起来也更直观。

#include <iostream>
#include <thread>
#include <atomic>

int main() {
    std::atomic<int> counter = 0;

    std::thread t1([&] {
        for (int i = 0; i < 10000; i++) {
            counter += 1;
        }
    });

    std::thread t2([&] {
        for (int i = 0; i < 10000; i++) {
            counter += 1;
        }
    });

    t1.join();
    t2.join();

    std::cout << counter << std::endl;

    return 0;
}

12.3.1 注意:请用 +=,不要让 + 和 = 分开(运算符重载)

不过要注意了,这种写法:

  1. counter = counter + 1; // 错,不能保证原子性
  2. counter += 1; // OK,能保证原子性
  3. counter++; // OK,能保证原子性

比如说使用如下加法,得到的最终结果是不等于20000的:

std::thread t1([&] {
    for (int i = 0; i < 10000; i++) {
        counter = counter + 1;
    }
});

std::thread t2([&] {
    for (int i = 0; i < 10000; i++) {
        counter = counter + 1;
    }
});

12.3.2 调用函数名

除了用方便的运算符重载之外,还可以直接调用相应的函数名,比如:

  • fetch_add 对应于 +=
  • store 对应于 =
  • load 用于读取其中的 int 值
int main() {
    std::atomic<int> counter;
    counter.store(0);

    std::thread t1([&] {
        for (int i = 0; i < 10000; i++) {
            counter.fetch_add(1);
        }
    });

    std::thread t2([&] {
        for (int i = 0; i < 10000; i++) {
            counter.fetch_add(1);
        }
    });

    t1.join();
    t2.join();

    std::cout << counter.load() << std::endl;

    return 0;
}

12.3.3 fetch_add:会返回其旧值

int old = atm.fetch_add(val)

除了会导致 atm 的值增加 val 外,还会返回 atm 增加前的值,存储到 old。这个特点使得它可以用于并行地往一个列表里追加数据:追加写入的索引就是 fetch_add 返回的旧值。
当然这里也可以 counter++,不过要追加多个的话还是得用到 counter.fetch_add(n)。

int main() {
    std::atomic<int> counter;
    counter.store(0);

    std::vector<int> data(20000);

    std::thread t1([&] {
        for (int i = 0; i < 10000; i++) {
            int index = counter.fetch_add(1);
            data[index] = i;
        }
    });

    std::thread t2([&] {
        for (int i = 0; i < 10000; i++) {
            int index = counter.fetch_add(1);
            data[index] = i + 10000;
        }
    });

    t1.join();
    t2.join();

    std::cout << data[10000] << std::endl;

    return 0;
}

12.3.4 exchange:读取的同时写入

exchange(val) 会把 val 写入原子变量,同时返回其旧的值。

int main() {
    std::atomic<int> counter;

    counter.store(0);

    int old = counter.exchange(3);
    std::cout << "old=" << old << std::endl;  // 0

    int now = counter.load();
    std::cout << "cnt=" << now << std::endl;  // 3

    return 0;
}

12.3.5 compare_exchange_strong:读取,比较是否相等,相等则写入

compare_exchange_strong(old, val) 会读取原子变量的值,比较它是否和 old 相等:

  • 如果不相等,则把原子变量的值写入 old。
  • 如果相等,则把 val 写入原子变量(在下面例子中为counter)。
  • 返回一个 bool 值,表示是否相等。

注意 old 这里传的其实是一个引用,因此 compare_exchange_strong 可修改他的值。

int main() {
    boolalpha(std::cout);
    std::atomic<int> counter;

    counter.store(2);

    int old = 1;
    bool equal = counter.compare_exchange_strong(old, 3);
    std::cout << "equal=" << equal << std::endl;  // false
    std::cout << "old=" << old << std::endl;  // 2

    int now = counter.load();
    std::cout << "cnt=" << now << std::endl;  // 3

    old = 2;
    equal = counter.compare_exchange_strong(old, 3);
    std::cout << "equal=" << equal << std::endl;  // true
    std::cout << "old=" << old << std::endl;  // 1

    now = counter.load();
    std::cout << "cnt=" << now << std::endl;  // 3

    return 0;
}

12.3.6 方便理解的伪代码

为了方便理解,可以假想 atomic 里面是这样实现的:

template <class T>
struct atomic {
    std::mutex mtx;
    int cnt;

    int store(int val) {
        std::lock_guard _(mtx);
        cnt = val;
    }

    int load() const {
        std::lock_guard _(mtx);
        return cnt;
    }

    int fetch_add(int val) {
        std::lock_guard _(mtx);
        int old = cnt;
        cnt += val;
        return old;
    }

    int exchange(int val) {
        std::lock_guard _(mtx);
        int old = cnt;
        cnt = val;
        return old;
    }

    bool compare_exchange_strong(int &old, int val) {
        std::lock_guard _(mtx);
        if (cnt == old) {
            cnt = val;
            return true;
        } else {
            old = cnt;
            return false;
        }
    }
};

可以看到其中 compare_exchange_strong 的逻辑最为复杂,一般简称 CAS (compare-and-swap),他是并行编程最常用的原子操作之一。实际上任何 atomic 操作,包括 fetch_add,都可以基于 CAS 来实现:这就是 Taichi 实现浮点数 atomic_add 的方法。

13. 补充一些常用情形

13.1 关于 thread 中类的使用

有时类需要将该成员传递进 thread,所以特别写写它的用法 thread 可调用的方式如下:

#include <iostream>
#include <thread>
using namespace std;
class A {
public:
	void f(int x, char c);
	int operator()(int m) { return m*m; }
};
void foo(int x){}
int main() {
	A a;
	thread t1(a, 6);//传递a的拷贝给子线程
	thread t2(ref(a), 6);//传递a的引用给子线程
	thread t3(move(a), 6);//将a从主线程移动到子线程,在主线程中a已经失效了
	thread t4(A(), 6);//传递临时创建的a对象给子线程
 
	thread t5(foo, 6);//传递函数
	thread t6([](int x) {return x*x; }, 6);//传递lambda函数
	thread t7(&A::f, a,8 ,'w');//传递a的拷贝成员函数给子线程
	thread t8(&A::f, &a, 8, 'w');//传递a的地址成员函数给子线程
 
	async(a,6);//同样适用于上述的8种方式
	return 0;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

C++ 并行编程(thread)---多线程 的相关文章

随机推荐

  • 机器学习第一周(一)--机器学习引入

    机器学习 监督学习 无监督学习 机器学习 1 机器学习算法主要分为俩大类 监督学习 Supervised Learning 和无监督学习 Unsupervised Learning 监督学习与无监督学习一个最大不同是监督学习的数据是带标签的
  • 调用百度地图5.0出现java.lang.UnsatisfiedLinkError: No implementation found for int

    转载请注明 http blog csdn net seven2729 article details 48289101 调用百度地图 5 0手机报错 黑屏并闪退 报错日志 java lang UnsatisfiedLinkError No
  • java——SpringBoot后台Controller接收数组参数

    1 前台传数组 var ids 1 2 3 4 5 6 ajax url postArray type post dateType json data ids ids success function res console log 数据
  • 状态码415解决

    状态码415 状态码415这是个什么鬼 常见的转态码 是200 204 206 301 302 303 304 400 401 403 404 405 500 503这个HTTP 协议原生的状态码 自己项目封装指定的那就另当别论 415很少
  • git报错“remote: HTTP Basic: Access denied”的解决方法

    使用git推送项目代码或者下载项目代码时出现报错 remote HTTP Basic Access denied 很多时候都是由于用户名和密码没有和注册所填信息保持一致造成的 解决方法是首先在git中输入 git config system
  • IC学习笔记6——单比特信号的跨时钟域处理方法之“打两拍”

    一 打两拍 对于单比特信号的跨时钟域处理问题 通常使用两级的寄存器来同步源寄存器的信号 这样的方法简称打两拍 1 1 电路波形图 如上图所示从源寄存器传递过来的信号adata没有满足目的寄存器的建立和保持时间 发生亚稳态 但是绝大多数的时候
  • 一行代码搞定Http请求,强得离谱~

    大家好 我是乔哥 OKHttpUtil 在Java的世界中 Http客户端之前一直是Apache家的HttpClient占据主导 但是由于此包较为庞大 API又比较难用 因此并不使用很多场景 而新兴的OkHttp Jodd http固然好用
  • jenkins配置测试邮件发送成功但构建邮件发送失败

    以下失败原因只是其中之一 每个人遇到的可能并不一样 希望本文能提供解决思路 在配置jenkins发送邮件的时候按照网上查询的各个步骤配置完毕 邮件测试也可以发送成功 但是当进行项目构建时 日志提示邮件已经发送 但实际并未收到邮件 使用的是Q
  • git clone 下载 其他分支

    总是记不住 可能是因为用得少 如果 已经 clone了 master分支 方法 1 那么 本地 git pull 然后执行 git checkout b 本地分支名 origin 远程分支名 这样就能下载 到远程分支 并建立本地关联 方法2
  • 【模拟】不高兴的津津

    题目 不高兴的津津 rqnoj20 题目描述 津津上初中了 妈妈认为津津应该更加用功学习 所以津津除了上学之外 还要参加妈妈为她报名的各科复习班 另外每周妈妈还会送她去学习朗诵 舞蹈和钢琴 但是津津如果一天上课超过八个小时就会不高兴 而且
  • loadrunner agent process进程

    LoadRunner Agent Processer是LR的代理进程 有很大的用处 比如当一台机器要分担一定 负载的时候 这个LoadRunner Agent Processer是必须启动的 在LR安装后一般是开机启动的 可以关掉 就象个地
  • git 使用总结

    1 本地安装git 略 2 创建github账号 略 3 本地配置 配置用户名和邮箱 git config global user name xiaobuisme git config global user email 81954469
  • Codemonkey 编码冒险课程

    转自 https blog csdn net mmh19891113 article details 80704745 Codemonkey 编码冒险课程 1 200 关卡 我们并没有按照他们官方的来划分关卡 官方是1 100 101 20
  • 在struts框架下实现文件的上传

    由于jspsmartupload上传文件 当前端页面没有file控件时 后端用jspsmartupload控件upload时将会走入一个死循环 现在采用struts自己提供的功能实现文件的上传 1 前端页面upload jsp
  • 使用Process Monitor工具监测进程对注册表和文件的操作

    使用Process Monitor工具监测进程对注册表和文件的操作 在C C 中编写代码实现 Process Monitor是一款功能强大的Windows系统工具 它可以用于监测和记录系统中的进程对注册表和文件的操作 通过使用Process
  • SQL注入攻击介绍

    SQL注入攻击介绍 一 SQL注入攻击简介 SQL注入攻击是指 后台数据库操作时 如果拼接外部参数到SQL语句中 就可能导致欺骗服务器执行恶意的SQL语句 造成数据泄露 删库 页面篡改等严重后果 按变量类型分为 数字型 字符型 按HTTP提
  • tomcat开启远程管理Manager

    启动tomcat 点击Manager App 403错误 根据提示 有两个地方需要修改 一个是开启允许远程访问 否则只能本机访问 另一个是打开manager gui 添加用户权限 1 开启远程访问 两种方式 a 打开若没有则新建 conf
  • Elasticsearch Java API四种实现方式

    0 题记 之前Elasticsearch的应用比较多 但大多集中在关系型 非关系型数据库与Elasticsearch之间的同步 以上内容完成了Elasticsearch所需要的基础数据量的供给 但想要在海量的数据中找到和自己相关的业务数据
  • 使用hbuilderx开发小程序项目遇到的问题

    因为在hbuilderx打开项目 文件结构与开发者工具中打开不一致 例如hbuilderx中只有一个 vue文件 开发者工具中则是四个文件wxml wxss js json分别对应结构 样式 代码逻辑 和组件页面配置 配置组件相关 在hbu
  • C++ 并行编程(thread)---多线程

    C 并行编程 多线程 1 并发与并行 2 进程和线程 2 1 常规解释 2 2 总结 2 3 具体理解 2 4 为什么使用多线程 2 5 进程和线程的区别 3 C 中的多线程 3 1 存储持续性 补充 4 从头文件