最近正在看C++日志的开源代码,其中多个线程需要向文件中写入日志信息,该将该算法逻辑抽象出来的话就是生产者与消费者设计模式。常见的生产者与消费者模式主要分为四类:单生产者与单消费者模式、单生产者与多消费者模式、多生产者与单消费者模式以及多生产者与多消费这模式。下面将以此对上述的四种生产者与消费者模式进行分析。
1 单生产者与单消费者模式
在讲设计模式之前,需要明确什么是PV操作。PV操作是在多线程之间实现互斥与同步的算法。P操作(wait)是指只有当信号量大于等于0的时候,子线程才能够执行后续的代码,否则将会在此进行堵塞,并将信号量递减;V操作(signal)是指对信号量进行递增,然后执行后续的代码。
在单生产者与单消费者模式中,可以定义empty和full两个信号量来表示容器中当前的状态。在生产者中,如果empty的位置小于0的时候(即容器已经满了),生产者线程就在此进行等待,直到容器中有有剩余的位置可用于存放物品。在消费者中,如果当前full的位置小于等于0的时候(即容器为空的时候),消费者线程就在此等待,直到容器中有可消费的物品时。
上面算法的实现方式如下面的代码所示:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
using namespace std;
static const int numbers = 10; // 容器能够承载产品的个数
static int idx = 0; // 当前物体存放的索引
// 生产者与消费者模式
struct Item{
int container[numbers];
mutex _mutex;
condition_variable full;
condition_variable empty;
} item;
typedef struct Item Item;
void producer(Item* item, const int value){
// 如果容器满了的话,就等待
unique_lock<mutex> lock(item->_mutex);
while(idx >= numbers){
cout << "producer is full" << endl;
(item->full).wait(lock);
}
cout << "producer: " << value << endl;
item->container[idx++] = value;
(item->empty).notify_all();
}
void consumer(Item* item){
// 如果容器为空的话,就进行等待
unique_lock<mutex> lock(item->_mutex);
while(idx < 0){
cout << "consumer is empty" << endl;
(item->empty).wait(lock);
}
cout << "consumer: " << item->container[--idx] << endl;
(item->full).notify_all();
}
// 进行生产与消费
void run_producer(){
while(1){
sleep(1);
producer(&item, 1);
}
}
void run_consumer(){
for(int i = 0; i < 5; ++i){
sleep(1);
consumer(&item);
}
}
int main(){
thread thread_producer(run_producer);
thread thread_consumer(run_consumer);
thread_producer.join();
thread_consumer.join();
return 0;
}
在实际应用过程中,上述代码运行的结果如下图所示,不难发现在代码执行的过程中,生产者与消费者能够较好的对临界资源做到互斥访问,避免容器为空消费者对其进行消费,或者容器已满,生产者依旧对其进行生产的现象出现。
下面是上述代码需要补充的知识点:
1.1 sleep()函数
sleep()函数是c语言中用于对线程休眠的函数,传入参数以秒作为单位。在使用过程中需要导入<unistd.h>头文件。
1.2 lock_guard和unique_lock的区别
lock_guard:lock_guard对象在创建的时候实现对锁进行上锁,并在该对象销毁的时候将所进行开锁。这相比于直接对mutex对象进行lock和unlock要方便很多,减轻了程序员维护多线程的压力。
unique_lock:unique_lock比lock_guard要灵活很多,该对象可以控制上锁与解锁的时机,并且还允许线程进行等待。unique_lock经常配合condition_variable信号量对锁进行等待(wait()函数)以及释放锁(notify_all()函数)等操作。
2 单生产者多消费者模式
有了单生产者单消费者模式的基础之后,单生产者多消费者模式只是在上述模式上的扩展。生产者与消费者的逻辑代码不变,只是在多个消费者访问临界区的时候需要加一个互斥量,从而保证只能有一个线程消费物品。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
// 单生产者与多消费者模式
// 需要在单消费模式之上再添加一个mutex互斥量,只允许一个消费者在一个时间段内消费物品
using namespace std;
static const int numbers = 10; // 容器能够承载产品的个数
static int idx = 0; // 当前物体存放的索引
// 生产者与消费者模式
struct Item{
int container[numbers];
mutex _mutex;
mutex _consumer_mutex;
condition_variable full;
condition_variable empty;
} item;
typedef struct Item Item;
void producer(Item* item, const int value){
// 如果容器满了的话,就等待
unique_lock<mutex> lock(item->_mutex);
while(idx >= numbers){
cout << "producer is full" << endl;
(item->full).wait(lock);
}
// 上述的while函数可以改成下面的形式更好理解
// 为了保持和b站视频提供的视频一致,后续的代码都以while的形式为准,但是这两种形式都没错
//cout << "producer is full" << endl;
//(item->full).wait(lock,[&]{
// if(idx >= numbers)
// return false;
// return true; });
//cout << "producer: " << value << "; id = " << this_thread::get_id()<< endl;
item->container[idx++] = value;
(item->empty).notify_all();
}
void consumer(Item* item){
// 如果容器为空的话,就进行等待
unique_lock<mutex> lock(item->_mutex);
while(idx < 0){
cout << "consumer is empty!" << endl;
(item->empty).wait(lock);
}
cout << "consumer: " << item->container[--idx] << "; id = " << this_thread::get_id()<<endl;
(item->full).notify_all();
}
// 进行生产与消费
void run_producer(){
while(1){
sleep(1);
producer(&item, 1);
}
}
void run_consumer(){
for(int i = 0; i < 5; ++i){
sleep(1);
unique_lock<mutex> lock(item._consumer_mutex);
consumer(&item);
}
}
int main(){
thread thread_producer(run_producer);
thread thread_consumer1(run_consumer);
thread thread_consumer2(run_consumer);
thread thread_consumer3(run_consumer);
thread_producer.join();
thread_consumer1.join();
thread_consumer2.join();
thread_consumer3.join();
return 0;
}
下面是上述代码执行的结果:
3 多生产者单消费者模式
多生产者单消费者模式其实就是在上一个单生产者多消费者模式上的拓展,只是需要在多个消费者处理进程中添加一个互斥量,保证在生产产品的时候只能有一个线程进行访问。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
// 单生产者与多消费者模式
// 需要在单消费模式之上再添加一个mutex互斥量,只允许一个消费者在一个时间段内消费物品
using namespace std;
static const int numbers = 10; // 容器能够承载产品的个数
static int idx = 0; // 当前物体存放的索引
// 生产者与消费者模式
struct Item{
int container[numbers];
mutex _mutex;
mutex _consumer_mutex;
mutex _producer_mutex;
condition_variable full;
condition_variable empty;
} item;
typedef struct Item Item;
void producer(Item* item){
// 如果容器满了的话,就等待
unique_lock<mutex> lock(item->_mutex);
while(idx >= numbers){
cout << "producer is full" << endl;
(item->full).wait(lock);
}
int value = rand() % 100;
cout << "producer: " << value << "; id = " << this_thread::get_id()<< endl;
item->container[idx++] = value;
(item->empty).notify_all();
}
void consumer(Item* item){
// 如果容器为空的话,就进行等待
unique_lock<mutex> lock(item->_mutex);
while(idx < 0){
cout << "consumer is empty!" << endl;
(item->empty).wait(lock);
}
cout << "consumer: " << item->container[--idx] << "; id = " << this_thread::get_id()<<endl;
(item->full).notify_all();
}
// 进行生产与消费
void run_producer(){
while(1){
sleep(1);
unique_lock<mutex> lock(item._producer_mutex);
producer(&item);
}
}
void run_consumer(){
for(int i = 0; i < 5; ++i){
sleep(1);
//unique_lock<mutex> lock(item._consumer_mutex);
consumer(&item);
}
}
int main(){
thread thread_producer1(run_producer);
thread thread_producer2(run_producer);
thread thread_producer3(run_producer);
thread thread_consumer(run_consumer);
thread_producer1.join();
thread_producer2.join();
thread_producer3.join();
thread_consumer.join();
return 0;
}
下面就是代码运行的结果:
4 多生产者多消费者模式
多生产者多消费者模式其实就是上述三种模式的扩展。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
// 单生产者与多消费者模式
// 需要在单消费模式之上再添加一个mutex互斥量,只允许一个消费者在一个时间段内消费物品
using namespace std;
static const int numbers = 10; // 容器能够承载产品的个数
static int idx = 0; // 当前物体存放的索引
// 生产者与消费者模式
struct Item{
int container[numbers];
mutex _mutex;
mutex _consumer_mutex;
mutex _producer_mutex;
condition_variable full;
condition_variable empty;
} item;
typedef struct Item Item;
void producer(Item* item){
// 如果容器满了的话,就等待
unique_lock<mutex> lock(item->_mutex);
while(idx >= numbers){
cout << "producer is full" << endl;
(item->full).wait(lock);
}
int value = rand() % 100;
cout << "producer: " << value << "; id = " << this_thread::get_id()<< endl;
item->container[idx++] = value;
(item->empty).notify_all();
}
void consumer(Item* item){
// 如果容器为空的话,就进行等待
unique_lock<mutex> lock(item->_mutex);
while(idx < 0){
cout << "consumer is empty!" << endl;
(item->empty).wait(lock);
}
cout << "consumer: " << item->container[--idx] << "; id = " << this_thread::get_id()<<endl;
(item->full).notify_all();
}
// 进行生产与消费
void run_producer(){
while(1){
sleep(1);
unique_lock<mutex> lock(item._producer_mutex);
producer(&item);
}
}
void run_consumer(){
for(int i = 0; i < 5; ++i){
sleep(1);
unique_lock<mutex> lock(item._consumer_mutex);
consumer(&item);
}
}
int main(){
thread thread_producer1(run_producer);
thread thread_producer2(run_producer);
thread thread_producer3(run_producer);
thread thread_consumer1(run_consumer);
thread thread_consumer2(run_consumer);
thread_producer1.join();
thread_producer2.join();
thread_producer3.join();
thread_consumer1.join();
thread_consumer2.join();
return 0;
}
下面就是最终的运行效果:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)