C++11的半同步半异步线程池

2023-11-10

简介

半同步半异步线程池用的比较多,实现也比较简单。

其中同步层包括同步服务层和排队层,指的是将接收的任务排队,将所有的任务排队到一个队列中,等待处理;

异步层指多个线程处理任务,异步处理层从同步层取出任务,并发处理任务。

在这里插入图片描述

同步队列

同步队列属于同步层的内容,主要作用是保证队列中共享数据线程安全,同时也提供新增任务的接口,以及提供取任务的接口。

这里使用C++11的锁、条件变量、右值引用、std::move和std::forward来实现。

同步队列主要包括三个函数,Take、Add和Stop。

Take函数

这里实现重载了两个Take函数,可支持一次获取多个任务,或者一次获取一个任务。

//可一次性获取多个任务,放在list中,减少互斥锁阻塞时间
	void Take(std::list<T>& list)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		list = std::move(m_queue);
		m_notFull.notify_one();
	}
	//获取单个任务
	void Take(T& t)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		t = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}

先创建一个unique *lock 获取 mutex,然后再通过条件变量 m_*notEmpty 来等待判断式。判断式由两个条件组成,一个是停止的标志,另一个是不为空的条件,当不满足任何一个条件时,条件变量会释放 mutex 并将线程置于 waiting 状态,等待其他线程调用 notify_one/notify all 将其唤醒;当满足任何一个条件时,则继续往下执行后面的逻辑,即将队列中的任务取出,并唤醒一个正处于等待状态的添加任务的线程去添加任务。当处于 waiting 状态的线程被 notify_one 或notify all 唤醒时,条件变量会先重新获取 mutex,然后再检查条件是否满足,如果满足,则往下执行,如果不满足,则释放 mutex 继续等待。

Add函数

Add 的过程和 Take 的过程是类似的,也是先获取 mutex,然后检查条件是否满足,不满足条件时,释放 mutex 继续等待,如果满足条件,则将新的任务插入到队列中,并唤醒取任务的线程去取数据。

template<typename F>
	void Add(F &&x)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
		if (m_needStop)
			return;
		m_queue.emplace_back(std::forward<F>(x));
		m_notEmpty.notify_one();
	}

Stop函数

Stop 函数先获取 mutex,然后将停止标志置为 true。注意,为了保证线程安全,这里需要先获取 mutex,在将其标志置为 true 之后,再唤醒所有等待的线,因为等待的条件是m_needStop,并且满足条件,所以线程会继续往下执行。由于线程在 m_needStop 为 true 时会退出,所以所有的等待线程会相继退出。

另外一个值得注意的地方是,我们把 m notFull.notify_all0放到lock_guard 保护范围之外了,这里也可以将 m_notFull.notify all0)放到ockguard保护范围之内,放到外面是为了做一点优化。因为 notify_one 或 notify_all 会唤醒一个在等待的线程,线程被唤醒后会先获取 mutex 再检查条件是否满足,如果这时被 lock guard保护,被唤醒的线程则需要 lock guard 析构释放 mutex 才能获取(即stop函数执行完了才释放)。如果在 lock_guard 之外notify_one 或notify_all,被唤醒的线程获取锁的时候不需要等待 lock_guard 释放锁,性能会好一点,所以在执行 notify_one或notify_all 时不需要加锁保护。

void Stop()
	{
		{
			std::lock_guard<std::mutex> locker(m_mutex);
			m_needStop = true;
		}
		m_notFull.notify_all();
		m_notEmpty.notify_all();
	}

SyncQueue完整代码

”SyncQueue.h”

同步队列整体代码:

#pragma once
#include <iostream>
#include <list>
#include <mutex>

using namespace std;

template<typename T>
class SyncQueue
{
public:
	SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
	{
	}
	void Put(const T &x)
	{
		Add(x);
	}

	void Put(T &&x)
	{
		Add(std::forward<T>(x));
	}
	//可一次性获取多个任务,放在list中,减少互斥锁阻塞时间
	void Take(std::list<T>& list)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		list = std::move(m_queue);
		m_notFull.notify_one();
	}
	//获取单个任务
	void Take(T& t)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		t = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}
	void Stop()
	{
		{
			std::lock_guard<std::mutex> locker(m_mutex);
			m_needStop = true;
		}
		m_notFull.notify_all();
		m_notEmpty.notify_all();
	}
	bool Empty()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}
	bool Full()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size() == m_maxSize;
	}
	//可以获取任务数量
	int Count()
	{
		return m_queue.size();
	}
private:
	bool NotFull() const
	{
		bool full = m_queue.size() >= m_maxSize;
		if (full)
		{
			cout << "缓冲区满了,需要等待。。。" << endl;
		}
		return !full;
	}
	bool NotEmpty() const
	{
		bool empty = m_queue.empty();
		if (empty)
		{
			cout << "缓冲区空了,需要等待。。。,异步层的线程ID:" << this_thread::get_id() << endl;
		}
		return !empty;
	}
	template<typename F>
	void Add(F &&x)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
		if (m_needStop)
			return;
		m_queue.emplace_back(std::forward<F>(x));
		m_notEmpty.notify_one();
	}
private:
	std::list<T> m_queue; //缓冲区
	std::mutex m_mutex; //互斥量
	std::condition_variable m_notEmpty; //不为空的条件变量
	std::condition_variable m_notFull; //没有满的条件变量
	int m_maxSize; //同步队列最大的size
	bool m_needStop; //停止的标志
};

线程池

“ThreadPool.h”

线程池ThreadPool有3个成员变量,一个是线程组,这个线程组中的线程是预先创建的,应该创建多少个线程由外面传人,一般建议创建 CPU 核数的线程以达到最优的效率,线程组循环从同步队列中取出任务并执行,如果线程池为空,线程组将处于等待状态,等待任务的到来。

另一个成员变量是同步队列,它不仅用来做线程同步,还用来限制同步队列的上限,这个上限也是由使用者设置的。

第三个成员变量是用来停止线程池的,为了保证线程安全,我们用到了原子变量 atomic bool。下一节中将展示使用这个半同步半异步的线程池的实例。

#include<list>
#include<thread>
#include<functional>
#include<memory>
#include<atomic>
#include "SyncQueue.h"

const int MaxTaskCount = 100;
class ThreadPool
{
public:
	using Task = std::function<void()>;
	ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
	{
		Start(numThreads);
	}
	~ThreadPool(void)
	{
		Stop();
	}
	void Stop()
	{
		//保证多线程情况下只调用一次 StopThreadGroup
		std::call_once(m_flag, [this] {StopThreadGroup(); });
	}
	//可输入右值,例如lambda表达式
	void AddTask(Task&& task)
	{
		m_queue.Put(std::forward<Task>(task));
	}
	void AddTask(const Task& task)
	{
		m_queue.Put(task);
	}
	void Start(int numThreads)
	{
		m_running = true;
		//创建线程组
		for (int i = 0; i < numThreads; ++i)
		{
			m_threadgroup.emplace_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
		}
	}
private:
	
	void RunInThread()
	{
		while (m_running)
		{
			//取任务分别执行
			std::list<Task> list;
			m_queue.Take(list);

			for (auto& task : list)
			{
				if (!m_running)
					return;
				task();
			}
		}
	}
	void StopThreadGroup()
	{
		m_queue.Stop(); //让同步队列中的线程停止
		m_running = false; //置为false,让内部线程跳出循环并退出

		for (auto thread : m_threadgroup)
		{
			if (thread)
				thread->join();
		}
		m_threadgroup.clear();

	}
	std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
	SyncQueue<Task> m_queue; //同步队列
	atomic_bool m_running; //是否停止的标志
	std::once_flag m_flag;
};

主函数测试

#include <iostream>
#include "ThreadPool.h"
using namespace std;

void TestThdPool()
{
	ThreadPool pool(2);//创建一个2个线程的线程池

	//创建一个线程来添加10个任务1
	std::thread thd1([&pool] {
		for (int i = 0; i < 10; i++)
		{
			auto thdId = this_thread::get_id();
			pool.AddTask([thdId] {//添加任务可以使用lambda表达式,代码中实现了右值作为参数输入
				cout << "同步线程1的线程ID:" << thdId << endl;
			});
		}
	});
	//创建一个线程来添加20个任务2
	std::thread thd2([&pool] {
		for (int i = 0; i < 20; i++)
		{
			auto thdId = this_thread::get_id();
			pool.AddTask([thdId] {
				cout << "同步线程2的线程ID:" << thdId << endl;
			});
		}
	});

	this_thread::sleep_for(std::chrono::seconds(2));
	getchar();
	pool.Stop();
	thd1.join();
	thd2.join();
}
int main()
{
	TestThdPool();
	return 0;
}

运行结果:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

C++11的半同步半异步线程池 的相关文章

  • 提升mapped_file_source、对齐方式和页面大小

    我正在尝试在性能很重要的上下文中解析一些大小高达几百兆字节的文本文件 因此我使用 boostmapped file source 解析器期望源以空字节终止 因此我想检查文件大小是否是页面大小的精确倍数 如果是 则使用较慢的非内存映射方法 我
  • 为什么 std::function 不是有效的模板参数,而函数指针却是?

    我已经定义了名为的类模板CallBackAtInit其唯一目的是在初始化时调用函数 构造函数 该函数在模板参数中指定 问题是模板不接受std function作为参数 但它们接受函数指针 为什么 这是我的代码 include
  • 编写具有多种类型的泛型扩展方法时的类型推断问题

    我正在为 IEnumerable 编写一个通用扩展方法 用于将对象列表映射到另一个映射对象列表 这就是我希望该方法的工作方式 IList
  • 嵌套字段的 Comparator.comparing(...)

    假设我有一个这样的域模型 class Lecture Course course getters class Course Teacher teacher int studentSize getters class Teacher int
  • 从点云检测平面集

    我有一组点云 我想测试3D房间中是否有角落 所以我想讨论一下我的方法 以及在速度方面是否有更好的方法 因为我想在手机上测试它 我将尝试使用霍夫变换来检测线 然后我将尝试查看是否有三条线相交 并且它们也形成了两个相交的平面 如果点云数据来自深
  • 如何从字符串中解析一个大整数? [复制]

    这个问题在这里已经有答案了 我有一个这样的方法 Integer parseInt myInt 不是这个整数变得很长 我得到以下异常 java lang NumberFormatException For input string 40001
  • MSChart 控件中的自定义 X/Y 网格线

    我有一个带有简单 2D 折线图的 C Windows 窗体 我想向其中添加自定义 X 或 Y 轴标记 并绘制自定义网格线 例如 以突出显示的颜色 虚线 我查看了 customLabels 属性 但这似乎覆盖了我仍然想显示的默认网格 这是为了
  • 如何从intellij项目视图中隐藏不必要的文件?

    给定一个示例 gradle 项目 其项目结构如下所示 正如你所看到的 有很多东西你实际上不需要在想法中看到 但你需要它们存在 我知道下面被忽略的文件 文件夹类型Editor File Types但这些正在影响库和项目 idea 会在各处忽略
  • C 与 C++ 中的 JNI 调用不同?

    所以我有以下使用 Java 本机接口的 C 代码 但是我想将其转换为 C 但不知道如何转换 include
  • HTTP 状态 405 - 此 URL java servlet 不支持 HTTP 方法 POST [重复]

    这个问题在这里已经有答案了 我无法使页面正常工作 我有要发布的表单方法和我的 servlet 实现doPost 然而 它不断地向我表明我并不支持POST方法 我只是想做一个简单的网站并将值插入到我的 MySQL 数据库中 type Stat
  • 在 Qt 中播放通知(频率 x)声音 - 最简单的方法?

    Qt 5 1 或更高版本 我需要播放频率为 x 的通知声音 n 毫秒 如果我能像这样组合音调那就太好了 1000Hz 持续 2 秒 然后 3000Hz 持续 1 秒 最简单的方法是使用文件 WAV MP3 例如如此处所述 如何用Qt播放声音
  • 删除 JFX 中选项卡后面的灰色背景

    So is there any way to remove the gray area behind the tab s 我尝试过用 CSS 来做到这一点 但没有找到方法 要设置 tabpane 标题的背景颜色 请在 CSS 文件中写入 t
  • 如何调用与现有方法同名的扩展方法? [复制]

    这个问题在这里已经有答案了 我有这样的代码 public class TestA public string ColA get set public string ColB get set public string ColC get se
  • 不使用放置 new 返回的指针时的 C++ 严格别名

    这可能会导致未定义的行为吗 uint8 t storage 4 We assume storage is properly aligned here int32 t intPtr new void storage int32 t 4 I k
  • 时间:2019-03-17 标签:c#TimerStopConfusion

    我想通过单击按钮时更改文本颜色来将文本框文本设置为 闪烁 我可以让文本按照我想要的方式闪烁 但我希望它在闪烁几次后停止 我不知道如何在计时器触发几次后让它停止 这是我的代码 public Form1 InitializeComponent
  • java中使用多线程调用同一类的不同方法

    我有一个类 如下所示 具有三种方法 public class MyRunnable implements Runnable Override public void run what code need to write here to c
  • java.lang.IllegalStateException - 提交响应后无法创建会话

    我在我的项目中使用 JSF PrimeFaces 我为此准备了一个Maven项目 当我编译项目并加载主页后 我收到以下异常 java lang IllegalStateException Cannot create a session af
  • 如何使用自定义 JDK 构建 Jenkins 项目?

    我有一个常规的 Jenkins 实例 运行一些多分支管道 该实例在 JDK 11 上运行 因为 Jenkins 并不真正支持更高版本 没关系 但不好的是 我的所有管道似乎也都受到 Java 11 的限制 Jenkins 仅使用它自己也使用的
  • Emacs C++,打开相应的头文件

    我是 emacs 新手 我想知道 是否有在头文件 源文件和相应的源文件 头文件之间切换的快捷方式 是否有像通用 emacs 参考卡那样的参考卡 Thanks There s ff find other file 您可以使用以下方法将其绑定到
  • IDisposable 的显式实现

    虽然有很多关于IDisposable在 SO 上找到 我还没有找到答案 我通常遵循这样的做法 当我的一个班级拥有一个IDisposable对象然后它也实现IDisposable并打电话Dispose在拥有的对象上 然而最近我遇到了一个类 它

随机推荐

  • GPGGA数据解析

    此文为转载其他博主的 由于没有注明转载出处 所以不从得知 谢谢原文作者 NMEA数据如下 GPGGA 121252 000 3937 3032 N 11611 6046 E 1 05 2 0 45 9 M 5 7 M 0000 77 GPR
  • 自定义数组的工具类

    1 创建ArrayUtilTest类 用于编写要实现数组的功能的方法 public class ArrayUtilTest 求数组的最大值 public int getMax int arr int max 0 for int i 0 i
  • cv2和PIL.Image之间的转换

    PIL Image转换成OpenCV格式 import cv2 from PIL import Image import numpy image Image open plane jpg image show img cv2 cvtColo
  • 搭建一个单节点的k8s集群

    首先安装kubectl kubeadm kubelet 关闭文件交换 sudo swapoff a 创建单节点集群 kubeadm init pod network cidr 192 168 0 0 16 kubernetes versio
  • OpenCV中的霍夫线变换、概率霍夫线变换

    OpenCV中的霍夫线变换 概率霍夫线变换 1 效果图 2 原理 2 1 什么是霍夫变换 2 2 什么是概率霍夫变换 3 源码 3 1 霍夫变换 3 2 概率霍夫变换 参考 这篇博客将介绍Python OpenCV中的霍夫变换 包括什么是霍
  • JavaScript的变量类型

    JavaScript的变量类型 JavaScript的数据类型分为两种 1 值类型 基本数据类型 字符串 String 数字 Number 布尔 Boolean 未定义 Undefined 空 Null Symbol 2 引用数据类型 对象
  • 应用层通过/sys/class/gpio文件操作gpio口

    1 内核gpio子系统介绍 应用层通过sysfs操作gpio的前提是内核中已经向gpio子系统注册了gpio资源 并且在 sys class 目录下可以看到gpio类 详细情况参考博客 2 6 35内核的gpio子系统详解 2 sys cl
  • 大数据学习——linux系统的网卡配置步骤

    ifconfig 查看ip 没有ip时需要配置 配置步骤 1输入命令setup 选择network configuration 选择runtool 选择device configuration 选择eth0 修改Use DHCP 把 用空格
  • (2020最新)CentOS7 解决登录MySQL后无mysql系统表问题

    前言 今天安装了MySQL 本来可以避免这个问题的 但是由于我的一时疏忽 整出了这个错误 那就解决呗 错误起因 我是按照这篇博客装的 https blog csdn net qq 43437122 article details 10355
  • vue 项目中页面打印实现(去除页眉页脚)

    vue 项目中页面打印实现 参考文章 13 Paged media 项目描述 背景 框架vue 组件 element ui 已有一个在用的后台管理系统 需求 现需在列表页面添加按钮 打印协议 并且在点击按钮以后 进入打印页面 确认无误后在打
  • 两个数组找出同时包含的数字

    问题 有两个int32位的无序数组找出同时包含的数字 数组长度分别为M和N 思路一 暴力搜索 采用循环遍历找出相同的数值 def find same array1 array2 found for i in array1 for j in
  • matlab newff激活函数,matlab神经网络newff函数的用法

    设 P T 是训练样本 X Y 是测试样本 net newrb P T err goal spread 建立网络 q sim net p e q T plot p q 画训练误差曲线 q sim net X e q Y plot X q 画
  • 由于找不到MSVCR120.dll,无法继续执行代码

    原因 这是因为 MSVCR120 dll 这个dll程序消失了 所以会出现这样的情况 解决方法 下载 MSVCR120 dll 这个dll文件 下载地址 DLL文件下载 在搜索框中输入确实的DLL文件 点击搜索即可 如下图 点击文件名 往下
  • 1.Django安装和项目创建

    Django 框架是用Python语言开发的 所以安装Django 就像安装其他的 Python库一样 执行如下命令即可 pip install django 你可以执行如下命令检查Django是否安装好 并且查看安装的Django版本 p
  • Web自动化测试,怎样断言和形成报告?

    目录 1 自动化断言 1 1断言概念 1 2断言介绍 1 3代码案例 2 自动化报告 2 1HTMLTestRunner 2 1 1HTMLTestRunner 的下载和安装 2 1 2HTMLTestRunner使用 2 2Beautif
  • 图像噪声与图像信噪比(一)

    图像噪声与图像信噪比 一 噪声是指图像密度的随机变动 具体指的是胶片的颗粒或者说数字图像上像素级的变动 这是一个关键的图像质量因素 和图像清晰度一样重要 它和图像的动态范围相关性较强 即一定亮度范围内相机可以提供的优秀的信噪比和反差 因为它
  • 应急响应流程及windows/linux用到的命令

    应急响应流程 1 收集信息 搜集客户信息和中毒信息 备份 2 判断类型 判断是否是安全事件 是何种安全事件 勒索病毒 挖矿 断网 ddos等 3 深入分析 日志分析 进程分析 启动项分析 样本分析 4 清理处置 杀掉恶意进程 删除恶意文件
  • #今日论文推荐# ECCV 2022

    今日论文推荐 ECCV 2022 旷视提出半监督目标检测模型Dense Teacher 取得SOTA性能 这篇论文提出了一个新的半监督目标检测模型 Dense Teacher 推翻了当前流行的用 thresholding 生成 hard p
  • virtualBox桥接模式下openEuler镜像修改IP地址、openEule修改IP地址、openEule设置IP地址

    安装好openEuler后 设置远程登入前 必不可少的一步 主机与虚拟机之间的通信要解决 下面给出详细步骤 第一步 检查虚拟机适配器模式 桥接模式 第二步 登入虚拟机修改IP cd etc sysconfig network scripts
  • C++11的半同步半异步线程池

    C 11的半同步半异步线程池 简介 同步队列 Take函数 Add函数 Stop函数 SyncQueue完整代码 线程池 主函数测试 简介 半同步半异步线程池用的比较多 实现也比较简单 其中同步层包括同步服务层和排队层 指的是将接收的任务排