本篇文章将用c++的面向对象思想,实现生产者消费者模型的代码编写。
个人感悟:
1、多生产者多消费者模型的共享竞争关系:
对缓冲区的共享和竞争:
生产者和消费者都需要对共享缓冲区进行操作,存入产品或消费产品。
mtx二值信号量和条件信号量的搭配使用,可以使得生产者或消费者在
第一时间获取到共享缓冲区的非满、非空的情况。同时如果缓冲区满或空
时则停止对应的生产者或消费者的操作,防止死锁情况的发生。
生产者对生产计数器的竞争:
此时的竞争是生产者内部的竞争,多个生产者同时生产,生产一个则计数器++,同时每一次生产的
产品在缓冲区中存入的位置都是与计数器当前值紧密相关的,用“produce_mutex”信号量锁住对
produce_item_count变量的访问权限,可以保证多个生产者对生产的产品的有序装入,不会发生错乱。
消费者对消费计数器的竞争:
同上
2、使用面向对象思想构建生产者消费者模型时,则将生产者消费者行为抽象为一个工厂模型:
工厂具有仓库:就是共享缓冲区;
工厂可以生产:就是生产者的工作内容;
工厂可以出货:就是消费者的工作内容;
因此可以利用工厂类和仓库类实现一个具有面向对象思想的生产者消费者运行代码。
代码实现如下:(IDE : VS2017)
#include "stdafx.h"
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
#include<windows.h>
#include<stdlib.h>
using namespace std;
static const int kItemRepositorySize = 10; // Item buffer size.
static const int kItemsToProduce = 100; // How many items we plan to produce.
template<class T>
class Repository {
public:
int item_buffer[kItemRepositorySize]; // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列.
size_t read_position; // 消费者读取产品位置.
size_t write_position; // 生产者写入产品位置.
std::mutex item_counter_mtx_for_producer;//互斥量,保证多个生产者对缓冲区的互斥访问
size_t item_counter_for_producer;
//生产者计数变量
std::mutex item_counter_mtx_for_consumer;//互斥量,保证多个消费者对缓冲区的互斥访问
size_t item_counter_for_consumer;
//消费者计数变量
std::mutex mtx;
// 互斥量,保护产品缓冲区
std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区不为满.
std::condition_variable repo_not_empty; // 条件变量, 指示产品缓冲区不为空.
Repository()
{
read_position = 0;
write_position = 0;
item_counter_for_producer = 0;
item_counter_for_consumer = 0;
};
void Init()
{
read_position = 0;
write_position = 0;
item_counter_for_producer = 0;
item_counter_for_consumer = 0;
}
};
template<class T>
class Factory
{
private:
//共享缓冲区对象
Repository<T> repo;
//生产者生产函数
void ProduceItem(Repository<T> &repo, T item)
{
std::unique_lock<std::mutex> lock(repo.mtx);
while (((repo.write_position + 1) % kItemRepositorySize) == repo.read_position)
{ // item buffer is full, just wait here.
std::cout << "Producer is waiting for an repo_not_full notification...\n";
(repo.repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.
}
(repo.item_buffer)[repo.write_position] = item; // 写入产品.
(repo.write_position)++; // 写入位置后移.
std::cout << "Producer thread: " << std::this_thread::get_id() << "is producing the:" << item << "^th item..." << std::endl;
(repo.item_counter_for_producer)++;
if (repo.write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置.
repo.write_position = 0;
(repo.repo_not_empty).notify_all(); // 通知消费者产品库不为空.
lock.unlock(); // 解锁.
}
//消费者消费函数
T ConsumeItem(Repository<T> &repo)
{
T data;
std::unique_lock<std::mutex> lock(repo.mtx);
// item buffer is empty, just wait here.
while (repo.write_position == repo.read_position)
{
std::cout << "Consumer is waiting for an repo_not_empty notification...\n";
(repo.repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.
}
data = (repo.item_buffer)[repo.read_position]; // 读取某一产品
(repo.read_position)++; // 读取位置后移
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is consuming the " << data << "^th item" << std::endl;
if (repo.read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位.
repo.read_position = 0;
(repo.repo_not_full).notify_all(); // 通知生产者产品库不为满.
lock.unlock(); // 解锁.
return data; // 返回产品.
}
public:
void Reset()
{
repo.Init();
}
void ProduceTask()
{
bool ready_to_exit = false;
while (1)
{
Sleep(1);
std::unique_lock<std::mutex>lock(repo.item_counter_mtx_for_producer);
if (repo.item_counter_for_producer<kItemsToProduce)
{
ProduceItem(repo, repo.item_counter_for_producer);
}
else ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true) {
break;
}
}
cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << endl;
}
void ConsumeTask() {
bool ready_to_exit = false;
while (1)
{
Sleep(1);
std::unique_lock<std::mutex> lock(repo.item_counter_mtx_for_consumer);
if (repo.item_counter_for_consumer < kItemsToProduce)
{
T item = ConsumeItem(repo); // 消费一个产品.
++(repo.item_counter_for_consumer);
}
else ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true) break;
}
cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << endl;
}
};
int main() {
cout << "Main thread id :" << this_thread::get_id() << endl;
class Factory<int> myfactory;
thread producer1(&Factory<int>::ProduceTask, &myfactory);
thread producer2(&Factory<int>::ProduceTask, &myfactory);
thread producer3(&Factory<int>::ProduceTask, &myfactory);
thread consumer1(&Factory<int>::ConsumeTask, &myfactory);
thread consumer2(&Factory<int>::ConsumeTask, &myfactory);
thread consumer3(&Factory<int>::ConsumeTask, &myfactory);
producer1.join();
producer2.join();
producer3.join();
consumer1.join();
consumer2.join();
consumer3.join();
system("pause");
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)