linux---生产者和消费者模型(条件变量和信号量实现)

2023-05-16

问题的提出
在我们对一些全局变量的进行非原子性操作的时候就可能出现非线程安全,比如我们吃面的问题。
在这里插入图片描述
我们做面的人就是生产者,吃面的人就是我们的消费者,当我们的消费者需要吃面的时候就唤醒我们的生产者进行生产,当我们有面的时候我们的生产者就不继续生产面条,去唤醒我们的消费者进行消费。
生产者和消费者模型:

 -  如何保证生产者与消费者的线程安全?
 - 生产者与生产者应该具有互斥关系
 - 消费者与消费者之间应该具有互斥关系
 - 生产者和消费者之间应该具有同步与互斥

实现生产者和消费者模型:

-  一个场所:
	 	一个场所就是我们多个线程能够同时操作的,比如一个全局变量的链表或者一个类中的队列等等
- 两种角色:
		两种角色是我们的生产者和消费者
- 三种关系:
		三种关系就是生产者与生产者、消费者与消费者、生产者和消费者时间的关系

代码实现:
特别指出:因为促使条件满足之后,pthread_con_singnal唤醒至少一个等待线程,导致因为条件的判断是一个if语句而造成一碗面多吃的情况,第一个吃面的人加锁吃面之后解锁,第二个吃面的人被唤醒继续吃面,此时条件的判断需要使用while,因为促使条件满足后,othrad_cond_wait唤醒是所有等待在条件变量线程,但是有可能唤醒的线程也是一个坐满的线程,因为已经有面条,条件不满足而陷入等待,导致死等,本质的原因就是唤醒了错误的角色。(因为不同的角色等待在统一条件变量上);

  1 #include <iostream>
  2 #include <queue>
  3 #include <stdlib.h>
  4 #include <unistd.h>
  5 class Blockqueue{
  6     public:
  7         //构造函数,初始化我们的锁和队列的大小
  8         Blockqueue(int cap = 10):_capcity(cap){
  9             pthread_mutex_init(&_mutex,NULL);
 10             pthread_cond_init(&_cond_prodoct,NULL);
 11             pthread_cond_init(&_cond_consumer,NULL);
 12         }
 13         ~Blockqueue(){//析构函数,销毁我们的锁和条件变量
 14             pthread_mutex_destroy(&_mutex);
 15             pthread_cond_destroy(&_cond_prodoct);
 16             pthread_cond_destroy(&_cond_consumer);
 17         }
 18         //提供公共的接口。出栈和入栈
 19         bool QueuePush(int data){
 20             //加锁
 21             QueueLock();
 22             while(QueueIsfull()){//当队列满了生产者等待等待
 23                 ProductorWait();
 24             }
 25             //进行入队列的操作
 26             _queue.push(data);
 27             //唤醒我们的消费者
 28             ConsumerWakeup();
 29             //解锁
 30             QueueUnlock();
 31             return true;
 32         }
 33         bool QueuePop(int* data){
 34             QueueLock();//加锁
 					//此时就是唤醒我们的队列的时候应该是while虚循环等待
 35             while(QueueIsempty()){//当队列是空的就等待
 36                 ConsumerWait();
 37             }
 38             //进行出队列的操作
 39             *data = _queue.front();
 40             _queue.pop();
 41             //唤醒我们的生产者
 42             ProductorWakeup();
 43             QueueUnlock();
 44             return true;
 45         }
 46     private:
 47         //实现上面的小函数
 48         void QueueLock(){
 49             pthread_mutex_lock(&_mutex);
 50         }
 51         void QueueUnlock(){
 52             pthread_mutex_unlock(&_mutex);
 53         }
 54         void ProductorWait(){
 55             pthread_cond_wait(&_cond_prodoct,&_mutex);
 56         }
 57         void ConsumerWait(){
 58             pthread_cond_wait(&_cond_consumer,&_mutex);
 59         }
 60         void ProductorWakeup(){
 61             pthread_cond_signal(&_cond_prodoct);
 62         }
 63         void ConsumerWakeup(){
 64             pthread_cond_signal(&_cond_consumer);
 65         }
 66         bool QueueIsfull(){
 67             return (_queue.size() == _capcity);
 68         }
 69         bool QueueIsempty(){
 70             return _queue.empty();
 71         }
 72     private:
 73         //一个队列
 74         std::queue<int> _queue;
 75         int _capcity;//容量
 76         pthread_mutex_t _mutex;//一个锁
 77         pthread_cond_t _cond_prodoct;
 78         pthread_cond_t _cond_consumer;
 79 
 80 };
 81 void* thr_product(void* arg){
 82     Blockqueue* p = (Blockqueue*)arg;
 83     int i = 0;
 84     while(1){
 85         p->QueuePush(i++);
 86         std::cout<<"生产者生产数据:"<<i<<std::endl;
 87     }
 88     return NULL;
 89 }
 90 void* thr_consumer(void* arg){
 91     Blockqueue* p = (Blockqueue*)arg;
 92     int data;
 93     while(1){
 94         p->QueuePop(&data);
 95         std::cout<<"消费者使用数据:"<<data<<std::endl;
 96     }
 97     return NULL;
 98 }
 99 int main(){
100     pthread_t ptid[4],ctid[4];
101     //创建四个生产者和四个消费者线程
102     //创建一个对垒
103     Blockqueue q;
104     int i = 0;
105     int ret;
106     for(i = 0;i < 4; i++){
107         ret = pthread_create(&ptid[i],NULL,thr_product,(void*)&q);
108         if(ret < 0){
109             std::cout<<"创建线程错误"<<std::endl;
110             return 0;
111         }
112     }
113     for(i = 0;i < 4; i++){
114         ret = pthread_create(&ctid[i],NULL,thr_consumer,(void*)&q);
115         if(ret < 0){
116             std::cout<<"创建线程错误"<<std::endl;
117             return 0;
118         }
119     }
120     //线程回收
121     for(int i = 0;i < 4;i++){
122         pthread_join(ptid[i],NULL);
123     }
124     for(int i = 0;i < 4;i++){
125         pthread_join(ctid[i],NULL);
126     }
127     
128     return 0;
129 }  

这里介绍一个新的知识点:信号量
信号量就是计数器+等待队列+等待+唤醒,他是一个自带计数体的条件变量,我们不需要进行条件的判断了,此时计数器本身就是一个判断的条件。
功能:实现线程与进程中间的同步与互斥
计数器->判断的条件,当计数只是0/1的时候那么此时就相当于就是一个互斥量。
信号量接口:

  • 信号量的初始化
       #include <semaphore.h>
       int sem_init(sem_t *sem, int pshared, unsigned int value);
       Link with -pthread.

参数:
sem:设置的信号量的变量
pshared:设置用于线程还是进程,0用于线程,1用于进程,
value:资源计数器的初值

  • 信号量等待
       #include <semaphore.h>
       int sem_wait(sem_t *sem);
       int sem_trywait(sem_t *sem);
       int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
       Link with -pthread.

sem_wait是等待,sem_trywait是尝试等待,sem_timedwait是限时等待。(明白等待是计数大于0的时候wait操作之后计数减1,直到计数小于等于0的时候则阻塞等待)
参数:
sem:是我们定义的信号量变量
abs_timeout:是一个时间结构体

  • 信号量唤醒
       #include <semaphore.h>
       int sem_post(sem_t *sem);
       Link with -pthread.

唤醒等待操作,则计数+1;
参数:
sem:定义的信号量变量

  • 信号量销毁
       #include <semaphore.h>
       int sem_destroy(sem_t *sem);
       Link with -pthread.

信号量和条件变量的区别:
信号量拥有资源计数的功能,临界资源是否能够操作通过自身计数判断,条件变量是搭配互斥锁进行一起使用
信号量还可以实现互斥计数仅仅为0/1.
信号量实现生产者消费者模型

  1 #include <iostream>
  2 #include <vector>
  3 #include <semaphore.h>
  4 #include <pthread.h>
  5 class Blockqueue{
  6     public:
  7     Blockqueue(int cap = 10):_queue(10),_capacity(cap),
  8     _read_step(0),_write_step(0)
  9     {
 10     //对我们三个信号量进行初始化
 11     //int sem_init(sem_t *sem, int pshared, unsigned int
 12     //value);
 13         sem_init(&_sem_data,0,0);
 14         sem_init(&_sem_idle,0,cap);
 15         sem_init(&_sem_lock,0,1);
 16     }
 17     ~Blockqueue(){
 18         sem_destroy(&_sem_data);
 19         sem_destroy(&_sem_idle);
 20         sem_destroy(&_sem_lock);
 21     }
 22 
 23     //提供公共的接口
 24     bool QueuePush_back(int data){
 25         //生产者等待
 26         ProductWait();
 27         //加锁
 28         QueueLock();
 29         //生产数据
 30         _queue[_write_step] = data;
 31         _write_step = (_write_step+1)% _capacity;
 32         //解锁
 33         QueueUnlock();
 34         //唤醒生产者
 35         ConsumerWakeup();
 36         return true;
 37     }
 38     bool QueuePop(int* data){
 39         //消费者等地啊
 40         ConsumerWait();
 41         //加锁
 42         QueueLock();
 43         //消费数据
 44         *data = _queue[_read_step];
 45         _read_step = (_read_step+1)%_capacity;
 46         //解锁
 47         QueueUnlock();
 48         //唤醒我们的生产者
 49         ProductWakeup();
 50         return true;
 51     }
 52     private:
 53         void QueueLock(){//加锁
 54             sem_wait(&_sem_lock);
 55         }
 56         void QueueUnlock(){
 57             sem_post(&_sem_lock);
 58         }
 59         void ProductWakeup(){
 60             sem_post(&_sem_idle);
 61         }
 62         void ProductWait(){
 63             sem_wait(&_sem_idle);
 64         }
 65         //此时对于消费者来说的话我们是对于数据资源来说的
 66         void ConsumerWait(){
 67             sem_wait(&_sem_data);
 68         }
 69         void ConsumerWakeup(){
 70             sem_post(&_sem_data);
 71         }
 72     private:
 73         //一个队列
 74         std::vector<int> _queue;
 75         int _capacity;//容量
 76         int _read_step;//读的位置
 77         int _write_step;//写的位置
 78         sem_t _sem_data;//数据资源空间
 79         sem_t _sem_idle;//空闲资源空间
 80         sem_t _sem_lock;//实现互斥的信号量
 81 
 82 };
 83 void* thr_consumer(void* arg){
 84     Blockqueue* b = (Blockqueue*)arg;
 85     int data;
 86     while(1){
 87         b->QueuePop(&data);
 88         std::cout<<"消费者消费数据"<<data<<std::endl;
 89     }
 90     return NULL;
 91 }
 92 void* thr_productor(void* arg){
 93     Blockqueue* b = (Blockqueue*)arg;
 94     int i = 0;
 95     while(1){
 96         std::cout<<"生产者生产了数据"<<i<<std::endl;
 97         b->QueuePush_back(i++);
 98     }
 99     return NULL;
100 }
101 int main(){
102     pthread_t ctid[4],ptid[4];
103     //创建四个生产者和四个消费者
104     Blockqueue b;
105     int ret = 0;
106     int i = 0;
107     for(i = 0;i < 4; i++){
108         ret = pthread_create(&ctid[i],NULL,thr_consumer,(void*)&b);
109         if(ret < 0){
110             std::cout<<"线程创建出现问题"<<std::endl;
111             return 0;
112         }
113     }
114     for(i = 0;i < 4; i++){
115         ret = pthread_create(&ptid[i],NULL,thr_productor,(void*)&b);
116         if(ret < 0){
117             std::cout<<"线程创建出现问题"<<std::endl;
118             return 0;
119         }
120     }
121     for(int i = 0;i < 4;i++){
122         pthread_join(ctid[i],NULL);
123     }
124     for(int i = 0;i < 4;i++){
125         pthread_join(ptid[i],NULL);
126     }
127     return 0;
128 }  
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

linux---生产者和消费者模型(条件变量和信号量实现) 的相关文章

  • Ros安装rosdep update出错,解决办法(从根本入手)

    博主作为一个ros刚入门的新手 xff0c 之前也安装过ros ros2但是在Ros安装在进行rosdep update 时运气与网络是成功的关键 xff0c 在尝试了好多次 xff0c 运气好一次就成功了 xff0c 运气不好得不停的试错
  • vscode使用方法

    01 ctrl 43 u 返回上一个光标焦点 02 发送请求插件 到VSCode插件中搜索REST Client 搜索到 xff0c 点击install进行安装 创建一个 http文件 编写测试接口文件 右键选择 发送请求 xff0c 测试
  • 自主飞行无人机开发--SALM cartographer开源框架 rplidar A2/3

    参考学习网站 xff1a https google cartographer ros readthedocs io en latest 问题提出 xff1a 四旋翼搭载激光雷达A3进行SLAM室内定位 xff0c 其怎样Running Ca
  • C#串口通信中的奇偶性校验、CRC校验函数

    一般来说 xff0c 通信协议中的通用数据格式是 包头 43 指令码 43 数据长度 43 有效数据 43 校验码 43 包尾 其中 xff0c 校验方式有多种 xff0c 最流行的是CRC校验方式 xff0c 其次是简单的奇偶性校验 校验
  • 测试软件安装步骤

    目录 Ja 目录 Java安装 jdk下载 jdk环境配置 phpstudy安装 禅道的安装 xampp安装 postman安装 requests parameterized jmeter安装 JVM监控 Locust SecureCRT软
  • linux---进程信号

    进程的功能以及概念信号的生命周期以及相关的接口自定义信号的捕捉流程信号集以及阻塞信号集了解一个SIGCHLD信号 信号的功能以及概念 信号的功能 xff1a 信号就是通知我们某一个事件的发生 信号的概念 xff1a 信号就是一个软件中断 x
  • 双目立体视觉:四(双目标定matlab,图像校正,图像匹配,计算视差,disparity详解,)

    二郎也比较忙 xff0c 在某大场工作 xff0c 有时候没有时间回复 如果希望二郎尽快帮忙 xff0c 可以将代码 xff0c 数据和问题发给二郎 xff0c 谢谢大家理解 glwang20 64 mails jlu edu cn 不过还
  • conda解决 An HTTP error occurred when trying to retrieve this URL.问题

    遇到 xff1a Collecting package metadata current repodata json failed CondaHTTPError HTTP 000 CONNECTION FAILED for url lt h
  • ubuntu20.04 安装 WPS 2019

    ubuntu自带的文字处理软件对来自windows下office或在WPS创建的ppt有点不兼容 xff0c 看到WPS有linux版本的 xff0c 便果断安装试一试 一 卸载原生liboffice sudo apt get remove
  • 制作ubuntu系统的 usb启动盘

    前言 由于课程的要求 xff0c 要尝试完成 编译安装Linux系统的内核 xff0c 但是在编译内核的过程中 xff0c Ubuntu的grub引导好像出了一些问题 xff0c 不能进入系统了 所以就要制作一个usb启动盘 xff0c 看
  • 项目用到的开源库>http-parser(一个HTTP消息解析器)

    http parser 定义 xff1a 一个用C编写的HTTP消息解析器 xff0c 可以解析请求和响应 优点 xff1a 不会进行任何系统调用及内存分配 xff0c 它不会缓冲数据 xff0c 它可以被随时中断 根据你的体系结构 xff
  • 项目优化>C++,concurrentqueue(高性能并发队列)

    项目中的数据队列基于轮询和selep的实时性及CUP性能差 xff0c 需要进行优化 xff0c 尝试使用concurrentqueue进行优化 网上有一些资料介绍 可供参考 使用后的个人理解 一个线程安全的queue xff0c 并且co
  • shell>sed (对文件的内容进行替换)

    在生产环境中经常会遇到对配置文件的修改 xff0c 那写成一个脚本 xff0c 传个参数修改配置文件不就方便多了吗 对一个文件的操作那我暂且分为增删改查了 写在前面 i 这个参数很重要 xff0c 若需要更改源文件 xff0c 请配合 i参
  • Linux>软链接的作用与使用

    软链接 xff1a 为某一个文件在另外一个位置建立一个同不的链接 类似于Windows下的快捷方式 好处 xff1a 仅仅是个链接 xff0c 不占用空间 xff0c 使用还是照常使用 实际生产环境中 xff0c 使用第三方的工具 xff0
  • WPF--->自定义控件中添加按钮,TextBox添加Button为例

    span class token tag span class token tag span class token punctuation lt span Style span span class token attr name spa
  • STM32F767--->串口通信接收不定长数据的处理方法

    文章目录 写在前面超时中断相关寄存器接收器超时寄存器 USARTx RTOR 空闲中断整体思路 附录程序代码参考 写在前面 使用STM32F767有两种中断方式 超时中断和空闲中断 接不定长的数据 xff0c 其中超时中断要比空闲中断好用
  • STM32F7--->串口相关,串口的所有寄存器详讲

    文章目录 串口相关寄存器串口控制寄存器CR1CR2CR3 波特率寄存器 BRR保护时间和预分频寄存器 GTPR接收超时寄存器 RTOR请求寄存器 RQR中断和状态寄存器 ISR中断标志清零寄存器 ICR接收数据寄存器 RDR发送数据寄存器
  • linux---线程概念和线程控制

    1 什么是线程 2 线程和进程的关系 3 线程的工作原理 4 线程和进程实现并发任务的优 缺 5 线程控制 什么是线程 在传统的操作系统中 xff0c 进程就是一个pcb xff08 进行运行中的程序的描述信息 xff09 xff0c 控制
  • Docker--->文件/文件夹的挂载映射

    宿主机的文件夹挂载映射 span class token function docker span run it v d span class token punctuation span dockr attach span class t
  • Docker--->总目录

    Docker Docker xff1e 安装与简介 Docker xff1e 创建镜像到gitlab仓库 Docker xff1e 容器的创建启动进入 Docker xff1e 文件 文件夹的挂载映射 在VSCode中打开容器 删除容器 x

随机推荐