一个场所: 队列
两个角色: 生产者、消费者
三种关系:
生产者-生产者:互斥
消费者-消费者:互斥
生产者-消费者:同步+互斥
如果生产者直接将数据交给消费,那么代码的耦合性将会非常强,而且如果生产数据特别快而消耗数据特别慢将会造成数据堆积,数据可能会丢失
因此利用 场所,生产者将数据放到场所中,消费者从场所中拿取数据
场所的作用:
解耦合
支持忙闲不均
支持并发
条件变量实现
#include <cstdio>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#define MAX_QUEUE 5
class BlockQueue {
public:
BlockQueue() {
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond_pro, NULL);
pthread_cond_init(&_cond_con, NULL);
}
~BlockQueue() {
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_pro);
pthread_cond_destroy(&_cond_con);
}
void Push(int data) {
pthread_mutex_lock(&_mutex);
while (_queue.size() == MAX_QUEUE) {
pthread_cond_wait(&_cond_pro, &_mutex);
}
_queue.push(data);
pthread_cond_signal(&_cond_con);
pthread_mutex_unlock(&_mutex);
}
void Pop(int *data) {
pthread_mutex_lock(&_mutex);
while (_queue.empty()) {
pthread_cond_wait(&_cond_con, &_mutex);
}
*data = _queue.front();
_queue.pop();
pthread_cond_signal(&_cond_pro);
pthread_mutex_unlock(&_mutex);
}
int Size() {
return _queue.size();
}
private:
std::queue<int> _queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond_pro;
pthread_cond_t _cond_con;
};
pthread_mutex_t mut;
int resources = 0;
void* thr_pro(void* arg) {
BlockQueue* q = (BlockQueue*)arg;
while (1) {
pthread_mutex_lock(&mut);
q->Push(resources++);
pthread_mutex_unlock(&mut);
printf(" %lu product %04d %6d\n", pthread_self(), resources, q->Size());
usleep(200000);
}
return NULL;
}
void* thr_con(void* arg) {
BlockQueue* q = (BlockQueue*)arg;
while (1) {
int data;
q->Pop(&data);
printf("%lu consume %04d %6d\n", pthread_self(), data, q->Size());
usleep(200000);
}
return NULL;
}
int main() {
BlockQueue* q = new BlockQueue;
pthread_mutex_init(&mut, NULL);
pthread_t pro_tid[4], con_tid[4];
for (int i = 0; i < 4; ++i) {
int ret = pthread_create(&pro_tid[i], NULL, thr_pro, (void*)q);
if (ret != 0) {
perror("pthread_create error~~\n");
return -1;
}
ret = pthread_create(&con_tid[i], NULL, thr_con, (void*)q);
if (ret != 0) {
perror("pthread_create error~~\n");
return -1;
}
}
for (int i = 0; i < 4; ++i) {
pthread_join(pro_tid[i], NULL);
pthread_join(con_tid[i], NULL);
}
pthread_mutex_destroy(&mut);
return 0;
}
信号量实现
#include <vector>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#define MAX_QUEUE 5
class RingQueue {
public:
RingQueue(int cap = MAX_QUEUE)
: _queue(5)
, _front(0)
, _rear(0)
, _capacity(cap)
{
sem_init(&_sem_lock, 0, 1);
sem_init(&_sem_data, 0, 0);
sem_init(&_sem_space, 0, _capacity);
}
~RingQueue() {
sem_destroy(&_sem_lock);
sem_destroy(&_sem_data);
sem_destroy(&_sem_space);
}
void Push(int data) {
sem_wait(&_sem_space);
sem_wait(&_sem_lock);
_queue[_front] = data;
_front = (_front + 1) % _capacity;
sem_post(&_sem_lock);
sem_post(&_sem_data);
}
void Pop(int *data) {
sem_wait(&_sem_data);
sem_wait(&_sem_lock);
*data = _queue[_rear];
_rear = (_rear + 1) % _capacity;
sem_post(&_sem_lock);
sem_post(&_sem_space);
}
int Size() {
int size = 0;
int front = _front;
while (front != _rear) {
front = (front + 1) % _capacity;
size++;
}
return size;
}
private:
std::vector<int> _queue;
int _front;
int _rear;
int _capacity;
sem_t _sem_lock;
sem_t _sem_data;
sem_t _sem_space;
};
int resources = 0;
sem_t sem_lock;
void* thr_pro(void* arg) {
RingQueue* q = (RingQueue*)arg;
while (1) {
sem_wait(&sem_lock);
q->Push(resources++);
sem_post(&sem_lock);
printf(" %lu product %04d %6d\n", pthread_self(), resources, q->Size());
usleep(10000);
}
return NULL;
};
void* thr_con(void* arg) {
RingQueue* q = (RingQueue*)arg;
while (1) {
int data;
q->Pop(&data);
printf("%lu consume %04d %6d\n", pthread_self(), data, q->Size());
usleep(10000);
}
return NULL;
}
int main() {
RingQueue* q = new RingQueue;
sem_init(&sem_lock, 0, 1);
pthread_t pro_tid[4], con_tid[4];
for (int i = 0; i < 4; ++i) {
int ret = pthread_create(&pro_tid[i], NULL, thr_pro, (void*)q);
if (ret != 0) {
perror("pthread_create error~~\n");
return -1;
}
ret = pthread_create(&con_tid[i], NULL, thr_con, (void*)q);
if (ret != 0) {
perror("pthread_create error~~\n");
return -1;
}
}
for (int i = 0; i < 4; ++i) {
pthread_join(pro_tid[i], NULL);
pthread_join(con_tid[i], NULL);
}
sem_destroy(&sem_lock);
return 0;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)