项目优化>C++,concurrentqueue(高性能并发队列)

2023-05-16

项目中的数据队列基于轮询和selep的实时性及CUP性能差,需要进行优化,尝试使用concurrentqueue进行优化。网上有一些资料介绍,可供参考。


使用后的个人理解:一个线程安全的queue,并且concurrentqueue的线程安全并不是一味的加锁,它有特殊的技巧,总的来说线程安全且高效但是不保证数据的有序性。是一个很nice的MQ。分析需求,使用阻塞队列,一旦有数据便被读取了,所以使用concurrentqueue可行。
使用

方法功能
ConcurrentQueue(size_t initialSizeEstimate)构造函数,它可选地接受队列将包含的元素数量的估计值
enqueue(T&& item)将一个项目排队,必要时分配额外的空间
try_enqueue(T&& item)将一个项目入队,但前提是已经分配了足够的内存,返回值为bool类型
try_dequeue(T& item)将一个项目出队,如果找到一个项目,则返回true;如果队列为空,则返回false
(size_t size_approx()返回元素个数
方法作用
bool try_dequeue_from_producer(producer_token_t const& producer, U& item)从特定的生产者队列出列
size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)批量出列
bool is_lock_free()检测是否是原子操作

concurrentqueue的API

封装于:“concurrentqueue.h”

# Allocates more memory if necessary
enqueue(item) : bool
enqueue(prod_token, item) : bool
enqueue_bulk(item_first, count) : bool
enqueue_bulk(prod_token, item_first, count) : bool

# Fails if not enough memory to enqueue
try_enqueue(item) : bool
try_enqueue(prod_token, item) : bool
try_enqueue_bulk(item_first, count) : bool
try_enqueue_bulk(prod_token, item_first, count) : bool

# Attempts to dequeue from the queue (never allocates)
try_dequeue(item&) : bool
try_dequeue(cons_token, item&) : bool
try_dequeue_bulk(item_first, max) : size_t
try_dequeue_bulk(cons_token, item_first, max) : size_t

# If you happen to know which producer you want to dequeue from
try_dequeue_from_producer(prod_token, item&) : bool
try_dequeue_bulk_from_producer(prod_token, item_first, max) : size_t

# A not-necessarily-accurate count of the total number of elements
size_approx() : size_t

简单示例

#include<iostream>
#include"concurrentqueue.h"
int main()
{
	int x=0,y=0,z=0;
	moodycamel::ConcurrentQueue<int> q;
	q.enqueue(2);
	q.enqueue(4);
	q.enqueue(6);
	q.try_dequeue(x);
	std::cout << q.size_approx() <<"  "<<x<< std::endl;
	if (q.is_lock_free)
		std::cout << "is lok free" << std::endl;
	return 0;
}

  • 需要注意的是concurrentqueue不保证数据的有序性
#include<iostream>
#include<string>
#include<thread>
#include<mutex>
#include"concurrentqueue.h"

class MintorAbstractBuffer
{
public:
	MintorAbstractBuffer(const uint32_t buffersize)
		:_Buffersize(buffersize)
	{}
	bool push(std::string s)
	{
		int32_t size = _buffer.size_approx();
		if (_Buffersize > size)
		{
			if (_buffer.enqueue(s))
			{
				++_Size;
				return true;
			}
		}
		return false;
	}
	bool RecursivePush(std::string s,int32_t num)
	{
		//std::lock_guard<std::mutex> lock(_mut);
		for (int32_t i = 0; i < num; ++i)
		{
			int32_t size = _buffer.size_approx();
			if (size >= _Buffersize)
			{
				std::cout << "Push Error" << " ";
				return false;
			}
			_buffer.enqueue(s);
			++_Size;
		}
		return true;
	}
	void pint()
	{
		int32_t size = _buffer.size_approx();
		for (int32_t i = 0; i <= size; ++i)
		{
			std::string curs;
			if (!_buffer.try_dequeue(curs))
				std::cout << "Pint Error" << std::endl;
			std::cout << curs<< i <<" ";
			if (!_buffer.enqueue(curs))
				std::cout << "Pint Error" << std::endl;
		}
		std::cout << std::endl;
		std::cout <<_buffer.size_approx()<< std::endl;
		std::cout <<_Size<< std::endl;
	}
private:
	std::mutex _mut;
	int32_t _Buffersize;
	moodycamel::ConcurrentQueue<std::string> _buffer;
	uint32_t _Size=0;
};

int main()
{
	std::string s1 = "a", s2 = "b", s3 = "c";
	MintorAbstractBuffer buffer(100000);
	std::thread t1(&MintorAbstractBuffer::RecursivePush, &buffer, s1, 333);
	std::thread t2(&MintorAbstractBuffer::RecursivePush, &buffer, s2, 333);
	std::thread t3(&MintorAbstractBuffer::RecursivePush, &buffer, s3, 334);
	t1.join();t2.join();t3.join();
	buffer.pint();
	return 0;
}

在这里插入图片描述
加锁之后使得++size不会被打断:
在这里插入图片描述
_size是cont的push次数,在++size过程中可能被其他线程的语句终止导致并没有++size,大概是1%的终止率。
想要数据有序还是要使用STL::queue,或者使用ConcurrentQueue的阻塞版本,见文末

#include<iostream>
#include<string>
#include<thread>
#include<mutex>
#include<queue>

class MintorAbstractBuffer
{
public:
	MintorAbstractBuffer(const uint32_t buffersize)
		:_Buffersize(buffersize)
	{}
	bool push(std::string s)
	{
		int32_t size = _buffer.size();
		if (_Buffersize > size)
		{
			_buffer.push(s);
			++_Size;
			return true;
		}
		return false;
	}
	bool RecursivePush(std::string s,int32_t num)
	{
		std::lock_guard<std::mutex> lock(_mut);
		for (int32_t i = 0; i < num; ++i)
		{
			int32_t size = _buffer.size();
			if (size > _Buffersize)
			{
				std::cout << "Push Error" << " ";
				return false;
			}
			_buffer.push(s);
			++_Size;
		}
		return true;
	}
	void pint()
	{
		int32_t size = _buffer.size();
		for (int32_t i = 0; i <= size; ++i)
		{
			std::cout << i;
			std::cout << _buffer.front() << " ";
			std::string curs;
			curs = _buffer.front();
			_buffer.pop();
			_buffer.push(curs);
			
		}
		std::cout <<std::endl;
		std::cout << "push cont :" << _Size << std::endl;
		std::cout << "_buffer.size() :" << _buffer.size() << std::endl;
	}

private:
	std::mutex _mut;
	int32_t _Buffersize;
	std::queue<std::string> _buffer;
	uint32_t _Size=0;
};

int main()
{
	std::string s1 = "a", s2 = "b", s3 = "c";
	MintorAbstractBuffer buffer(100000);
	int32_t x=3333;
	std::thread t1(&MintorAbstractBuffer::RecursivePush, &buffer, s1, x);
	std::thread t2(&MintorAbstractBuffer::RecursivePush, &buffer, s2, x);
	std::thread t3(&MintorAbstractBuffer::RecursivePush, &buffer, s3, x+1);
	t1.join();t2.join();t3.join();
	buffer.pint();
	return 0;
}

在这里插入图片描述


阻塞版本(blockingconcurrentqueue)的API

封装于:“blockingconcurrentqueue.h”,在工程中使用一个线程不断使用阻塞版本API调用数据可以提升数据的处理速度(缓冲区基本为空)。当然需要知道阻塞与非阻塞队列的区别,下面的API可以通过调整参数来控制多长时间从阻塞的队列中重新获取数据。github上有详细的介绍,出队操作有更改API如下。

wait_dequeue(U& item) : void
wait_dequeue_timed(U& item, std::int64_t timeout_usecs) : bool //时间参数 std::chrono::milliseconds(5)
wait_dequeue_bulk(It itemFirst, size_t max) : size_t 
wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs) : size_t 
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

项目优化>C++,concurrentqueue(高性能并发队列) 的相关文章

  • 关于构造函数,拷贝构造函数,析构函数的调用顺序(1)

    导言 对象是由 底层向上 开始构造的 xff0c 当建立一个对象时 xff0c 首先调用基类的构造函数 xff0c 然后调用下一个派生类的构造函数 xff0c 依次类推 xff0c 直至到达派生类次数最多的派生次数最多的类的构造函数为止 因
  • vector的内存释放

    xff11 vector内存分配机制 C 43 43 中vector的一个特点是 xff1a 内存空间只会增长 xff0c 不会减小 即为了支持快速的随机访问 xff0c vector容器的元素以连续方式存放 xff0c 每一个元素都挨着前
  • MFC多人在线聊天室

    我已经在我的资源里上传了这个聊天室的代码了 基于MFC的C 43 43 的select模型的TCP聊天室 采用select网络模型 xff0c 支持多人同时登陆 xff0c 功能有上线 下线 群聊 私聊 使用CjsonObject进行数据传
  • linux---进程间通信(ipc)之共享内存

    前面我们讲解了进程间通信之管道 xff0c 这段我们讲解我们的共享内存 共享内存是所有进程间通信方式最快的一种 内存共享模型就像下面的图一样 xff0c 就是将物理内存映射到我们进程的虚拟地址上 xff0c 我们就可以直接操作我们虚拟地址空
  • Effective C++总结

    explicit关键字 C 43 43 中的explicit关键字只能用于修饰只有一个参数或者是其他参数有默认值的类构造函数 它的作用是表明该构造函数是显式的 而非隐式的 跟它相对应的另一个关键字是implicit 意思是隐藏的 类构造函数
  • 计算机网络(5)TCP之重传机制

    重传机制 超时重传数据包丢失确认应答丢失 快速重传SACKD SACK例一 ACK 丢包例2 xff1a 网络延时 TCP 是通过序列号 确认应答 重发控制 连接管理以及窗口控制等机制实现可靠性传输的 TCP 实现可靠传输的方式之一 xff
  • 中断与回调

    1 xff0c 回调函数 回调函数的原理是使用函数指针实现类似 软中断 的概念 比如在上层的两个函数A和B xff0c 把自己的函数指针传给了C xff0c C通过调用A和B的函数指针达到 当做了什么 xff0c 通知上层来调用A或者B 的
  • CUDA 程序的优化(3) 任务划分

    4 3 1任务划分原则 首先 xff0c 需要将要处理的任务划分为几个连续的步骤 xff0c 并将其划分为CPU端程序和GPU端程序 划分时需要考虑的原则有 列出每个步骤的所有可以选择的算法 xff0c 并比较不同算法在效率和计算复杂度上的
  • C++ Matlab混合编程时“函数或变量 ‘matlabrc‘ 无法识别”

    在QT中调用Matlab初始化时 xff0c 出现了 函数或变量 matlabrc 无法识别 的情况 xff0c 接着崩溃 而且比较神奇的是 xff0c 前一次是可以初始化的 xff0c 但运行过程中发生了崩溃 直接搜解决办法 xff0c
  • Notepad++全选一整列的靠谱办法

    遇到行数较少的可以直接按住ALT手动选取 xff0c 但遇到行数较多 xff0c 就得这么干 xff1a 鼠标放在第一行某一列 xff0c 按住Alt 43 Shift xff0c 然后鼠标选择最后一行该列 xff0c 输入内容即可 xff
  • 对象转xml格式工具类

    import com ruiyun gui store haikang haikang bean FCSearchDescription import com ruiyun gui store haikang haikang bean FD
  • 【无标题】MQ静态图片获取

    public void getImageV40 String path Integer buildingProjectId HttpServletResponse response JSONObject param 61 new JSONO

随机推荐

  • 数据加解密时Base64异常:Illegal base64 character 3a

    现象 用base64工具类对中文进行处理时出现异常 xff0c 在数据加解密场景中经常使用 java lang IllegalArgumentException Illegal base64 character 3a at java uti
  • Winsock编程实例---TCP&UDP

    0x1 基于TCP的通信 1 服务端 1 1 创建基本流程 创建一个TCP服务端的程序需要调用的函数流程 xff1a 初始化函数库 gt gt WSAStartup 创建套接字 gt gt socket 绑定套接字 gt gt bind 监
  • 数据结构---选择排序(直接选择排序和堆排序图解)

    选择排序思想 xff1a 每一次从待排序的数据元素中选出最小 xff08 或最大 xff09 的一个元素 xff0c 存放在序列的起始位置 xff0c 直到全部待排序的 数据元素排完 直接选择排序 在元素集合array i array n
  • Java HttpUtils类

    Java HttpUtils类 Java HttpUtils类 定义 Public class HttpUtils 收集HTTP Servlet使用的静态的有效方法 方法 1 getRequestURL public static Stri
  • Ubuntu打造家用NAS三——网盘与影视中心

    Ubuntu打造家用NAS三 网盘与影视中心 一 Ubuntu 挂载硬盘 通过 Putty 连接 NAS查看硬盘位置 xff1a sudo fdisk l找到需要挂载的硬盘 xff0c 我的是 Disk dev sdb xff1a xxx
  • Ardupilot笔记:Rover auto模式下的执行流程

    先从mode auto cpp的update 开始分析 流程如图 xff1a 进入函数update 后会执行函数navigate to waypoint mode auto cpp span class token keyword void
  • 串口通信协议 UART+I2C+SPI

    UART 异步 串行 全双工 I2C SPI 不同通信协议比较 UART UART协议详解 UART通信 xff0c 接收与发送 xff08 详细版 xff0c 附代码 xff09 UART串行通信详解 待整理 UART是Universal
  • c语言---宏

    宏 1 仅仅替换 2 不能定义宏参类型 3 不会检查宏参有没有定义 定义带参数的宏 define JH a b t t 61 a a 61 b b 61 t xff0c 对两个参数a b的值进行交换 xff0c 下列表述中正确的是 xff0
  • Ros安装rosdep update出错,解决办法(从根本入手)

    博主作为一个ros刚入门的新手 xff0c 之前也安装过ros ros2但是在Ros安装在进行rosdep update 时运气与网络是成功的关键 xff0c 在尝试了好多次 xff0c 运气好一次就成功了 xff0c 运气不好得不停的试错
  • vscode使用方法

    01 ctrl 43 u 返回上一个光标焦点 02 发送请求插件 到VSCode插件中搜索REST Client 搜索到 xff0c 点击install进行安装 创建一个 http文件 编写测试接口文件 右键选择 发送请求 xff0c 测试
  • 自主飞行无人机开发--SALM cartographer开源框架 rplidar A2/3

    参考学习网站 xff1a https google cartographer ros readthedocs io en latest 问题提出 xff1a 四旋翼搭载激光雷达A3进行SLAM室内定位 xff0c 其怎样Running Ca
  • C#串口通信中的奇偶性校验、CRC校验函数

    一般来说 xff0c 通信协议中的通用数据格式是 包头 43 指令码 43 数据长度 43 有效数据 43 校验码 43 包尾 其中 xff0c 校验方式有多种 xff0c 最流行的是CRC校验方式 xff0c 其次是简单的奇偶性校验 校验
  • 测试软件安装步骤

    目录 Ja 目录 Java安装 jdk下载 jdk环境配置 phpstudy安装 禅道的安装 xampp安装 postman安装 requests parameterized jmeter安装 JVM监控 Locust SecureCRT软
  • linux---进程信号

    进程的功能以及概念信号的生命周期以及相关的接口自定义信号的捕捉流程信号集以及阻塞信号集了解一个SIGCHLD信号 信号的功能以及概念 信号的功能 xff1a 信号就是通知我们某一个事件的发生 信号的概念 xff1a 信号就是一个软件中断 x
  • 双目立体视觉:四(双目标定matlab,图像校正,图像匹配,计算视差,disparity详解,)

    二郎也比较忙 xff0c 在某大场工作 xff0c 有时候没有时间回复 如果希望二郎尽快帮忙 xff0c 可以将代码 xff0c 数据和问题发给二郎 xff0c 谢谢大家理解 glwang20 64 mails jlu edu cn 不过还
  • conda解决 An HTTP error occurred when trying to retrieve this URL.问题

    遇到 xff1a Collecting package metadata current repodata json failed CondaHTTPError HTTP 000 CONNECTION FAILED for url lt h
  • ubuntu20.04 安装 WPS 2019

    ubuntu自带的文字处理软件对来自windows下office或在WPS创建的ppt有点不兼容 xff0c 看到WPS有linux版本的 xff0c 便果断安装试一试 一 卸载原生liboffice sudo apt get remove
  • 制作ubuntu系统的 usb启动盘

    前言 由于课程的要求 xff0c 要尝试完成 编译安装Linux系统的内核 xff0c 但是在编译内核的过程中 xff0c Ubuntu的grub引导好像出了一些问题 xff0c 不能进入系统了 所以就要制作一个usb启动盘 xff0c 看
  • 项目用到的开源库>http-parser(一个HTTP消息解析器)

    http parser 定义 xff1a 一个用C编写的HTTP消息解析器 xff0c 可以解析请求和响应 优点 xff1a 不会进行任何系统调用及内存分配 xff0c 它不会缓冲数据 xff0c 它可以被随时中断 根据你的体系结构 xff
  • 项目优化>C++,concurrentqueue(高性能并发队列)

    项目中的数据队列基于轮询和selep的实时性及CUP性能差 xff0c 需要进行优化 xff0c 尝试使用concurrentqueue进行优化 网上有一些资料介绍 可供参考 使用后的个人理解 一个线程安全的queue xff0c 并且co