


 -  如何保证生产者与消费者的线程安全?
 - 生产者与生产者应该具有互斥关系
 - 消费者与消费者之间应该具有互斥关系
 - 生产者和消费者之间应该具有同步与互斥


-  一个场所:
- 两种角色:
- 三种关系:


  1 #include <iostream>
  2 #include <queue>
  3 #include <stdlib.h>
  4 #include <unistd.h>
  5 class Blockqueue{
  6     public:
  7         //构造函数,初始化我们的锁和队列的大小
  8         Blockqueue(int cap = 10):_capcity(cap){
  9             pthread_mutex_init(&_mutex,NULL);
 10             pthread_cond_init(&_cond_prodoct,NULL);
 11             pthread_cond_init(&_cond_consumer,NULL);
 12         }
 13         ~Blockqueue(){//析构函数,销毁我们的锁和条件变量
 14             pthread_mutex_destroy(&_mutex);
 15             pthread_cond_destroy(&_cond_prodoct);
 16             pthread_cond_destroy(&_cond_consumer);
 17         }
 18         //提供公共的接口。出栈和入栈
 19         bool QueuePush(int data){
 20             //加锁
 21             QueueLock();
 22             while(QueueIsfull()){//当队列满了生产者等待等待
 23                 ProductorWait();
 24             }
 25             //进行入队列的操作
 26             _queue.push(data);
 27             //唤醒我们的消费者
 28             ConsumerWakeup();
 29             //解锁
 30             QueueUnlock();
 31             return true;
 32         }
 33         bool QueuePop(int* data){
 34             QueueLock();//加锁
 35             while(QueueIsempty()){//当队列是空的就等待
 36                 ConsumerWait();
 37             }
 38             //进行出队列的操作
 39             *data = _queue.front();
 40             _queue.pop();
 41             //唤醒我们的生产者
 42             ProductorWakeup();
 43             QueueUnlock();
 44             return true;
 45         }
 46     private:
 47         //实现上面的小函数
 48         void QueueLock(){
 49             pthread_mutex_lock(&_mutex);
 50         }
 51         void QueueUnlock(){
 52             pthread_mutex_unlock(&_mutex);
 53         }
 54         void ProductorWait(){
 55             pthread_cond_wait(&_cond_prodoct,&_mutex);
 56         }
 57         void ConsumerWait(){
 58             pthread_cond_wait(&_cond_consumer,&_mutex);
 59         }
 60         void ProductorWakeup(){
 61             pthread_cond_signal(&_cond_prodoct);
 62         }
 63         void ConsumerWakeup(){
 64             pthread_cond_signal(&_cond_consumer);
 65         }
 66         bool QueueIsfull(){
 67             return (_queue.size() == _capcity);
 68         }
 69         bool QueueIsempty(){
 70             return _queue.empty();
 71         }
 72     private:
 73         //一个队列
 74         std::queue<int> _queue;
 75         int _capcity;//容量
 76         pthread_mutex_t _mutex;//一个锁
 77         pthread_cond_t _cond_prodoct;
 78         pthread_cond_t _cond_consumer;
 80 };
 81 void* thr_product(void* arg){
 82     Blockqueue* p = (Blockqueue*)arg;
 83     int i = 0;
 84     while(1){
 85         p->QueuePush(i++);
 86         std::cout<<"生产者生产数据:"<<i<<std::endl;
 87     }
 88     return NULL;
 89 }
 90 void* thr_consumer(void* arg){
 91     Blockqueue* p = (Blockqueue*)arg;
 92     int data;
 93     while(1){
 94         p->QueuePop(&data);
 95         std::cout<<"消费者使用数据:"<<data<<std::endl;
 96     }
 97     return NULL;
 98 }
 99 int main(){
100     pthread_t ptid[4],ctid[4];
101     //创建四个生产者和四个消费者线程
102     //创建一个对垒
103     Blockqueue q;
104     int i = 0;
105     int ret;
106     for(i = 0;i < 4; i++){
107         ret = pthread_create(&ptid[i],NULL,thr_product,(void*)&q);
108         if(ret < 0){
109             std::cout<<"创建线程错误"<<std::endl;
110             return 0;
111         }
112     }
113     for(i = 0;i < 4; i++){
114         ret = pthread_create(&ctid[i],NULL,thr_consumer,(void*)&q);
115         if(ret < 0){
116             std::cout<<"创建线程错误"<<std::endl;
117             return 0;
118         }
119     }
120     //线程回收
121     for(int i = 0;i < 4;i++){
122         pthread_join(ptid[i],NULL);
123     }
124     for(int i = 0;i < 4;i++){
125         pthread_join(ctid[i],NULL);
126     }
128     return 0;
129 }  


  • 信号量的初始化
       #include <semaphore.h>
       int sem_init(sem_t *sem, int pshared, unsigned int value);
       Link with -pthread.


  • 信号量等待
       #include <semaphore.h>
       int sem_wait(sem_t *sem);
       int sem_trywait(sem_t *sem);
       int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
       Link with -pthread.


  • 信号量唤醒
       #include <semaphore.h>
       int sem_post(sem_t *sem);
       Link with -pthread.


  • 信号量销毁
       #include <semaphore.h>
       int sem_destroy(sem_t *sem);
       Link with -pthread.


  1 #include <iostream>
  2 #include <vector>
  3 #include <semaphore.h>
  4 #include <pthread.h>
  5 class Blockqueue{
  6     public:
  7     Blockqueue(int cap = 10):_queue(10),_capacity(cap),
  8     _read_step(0),_write_step(0)
  9     {
 10     //对我们三个信号量进行初始化
 11     //int sem_init(sem_t *sem, int pshared, unsigned int
 12     //value);
 13         sem_init(&_sem_data,0,0);
 14         sem_init(&_sem_idle,0,cap);
 15         sem_init(&_sem_lock,0,1);
 16     }
 17     ~Blockqueue(){
 18         sem_destroy(&_sem_data);
 19         sem_destroy(&_sem_idle);
 20         sem_destroy(&_sem_lock);
 21     }
 23     //提供公共的接口
 24     bool QueuePush_back(int data){
 25         //生产者等待
 26         ProductWait();
 27         //加锁
 28         QueueLock();
 29         //生产数据
 30         _queue[_write_step] = data;
 31         _write_step = (_write_step+1)% _capacity;
 32         //解锁
 33         QueueUnlock();
 34         //唤醒生产者
 35         ConsumerWakeup();
 36         return true;
 37     }
 38     bool QueuePop(int* data){
 39         //消费者等地啊
 40         ConsumerWait();
 41         //加锁
 42         QueueLock();
 43         //消费数据
 44         *data = _queue[_read_step];
 45         _read_step = (_read_step+1)%_capacity;
 46         //解锁
 47         QueueUnlock();
 48         //唤醒我们的生产者
 49         ProductWakeup();
 50         return true;
 51     }
 52     private:
 53         void QueueLock(){//加锁
 54             sem_wait(&_sem_lock);
 55         }
 56         void QueueUnlock(){
 57             sem_post(&_sem_lock);
 58         }
 59         void ProductWakeup(){
 60             sem_post(&_sem_idle);
 61         }
 62         void ProductWait(){
 63             sem_wait(&_sem_idle);
 64         }
 65         //此时对于消费者来说的话我们是对于数据资源来说的
 66         void ConsumerWait(){
 67             sem_wait(&_sem_data);
 68         }
 69         void ConsumerWakeup(){
 70             sem_post(&_sem_data);
 71         }
 72     private:
 73         //一个队列
 74         std::vector<int> _queue;
 75         int _capacity;//容量
 76         int _read_step;//读的位置
 77         int _write_step;//写的位置
 78         sem_t _sem_data;//数据资源空间
 79         sem_t _sem_idle;//空闲资源空间
 80         sem_t _sem_lock;//实现互斥的信号量
 82 };
 83 void* thr_consumer(void* arg){
 84     Blockqueue* b = (Blockqueue*)arg;
 85     int data;
 86     while(1){
 87         b->QueuePop(&data);
 88         std::cout<<"消费者消费数据"<<data<<std::endl;
 89     }
 90     return NULL;
 91 }
 92 void* thr_productor(void* arg){
 93     Blockqueue* b = (Blockqueue*)arg;
 94     int i = 0;
 95     while(1){
 96         std::cout<<"生产者生产了数据"<<i<<std::endl;
 97         b->QueuePush_back(i++);
 98     }
 99     return NULL;
100 }
101 int main(){
102     pthread_t ctid[4],ptid[4];
103     //创建四个生产者和四个消费者
104     Blockqueue b;
105     int ret = 0;
106     int i = 0;
107     for(i = 0;i < 4; i++){
108         ret = pthread_create(&ctid[i],NULL,thr_consumer,(void*)&b);
109         if(ret < 0){
110             std::cout<<"线程创建出现问题"<<std::endl;
111             return 0;
112         }
113     }
114     for(i = 0;i < 4; i++){
115         ret = pthread_create(&ptid[i],NULL,thr_productor,(void*)&b);
116         if(ret < 0){
117             std::cout<<"线程创建出现问题"<<std::endl;
118             return 0;
119         }
120     }
121     for(int i = 0;i < 4;i++){
122         pthread_join(ctid[i],NULL);
123     }
124     for(int i = 0;i < 4;i++){
125         pthread_join(ptid[i],NULL);
126     }
127     return 0;
128 }  

