设计模式-生产者与消费者模式

2023-05-16

        最近正在看C++日志的开源代码,其中多个线程需要向文件中写入日志信息,该将该算法逻辑抽象出来的话就是生产者与消费者设计模式。常见的生产者与消费者模式主要分为四类:单生产者与单消费者模式、单生产者与多消费者模式、多生产者与单消费者模式以及多生产者与多消费这模式。下面将以此对上述的四种生产者与消费者模式进行分析。

1 单生产者与单消费者模式

        在讲设计模式之前,需要明确什么是PV操作。PV操作是在多线程之间实现互斥与同步的算法。P操作(wait)是指只有当信号量大于等于0的时候,子线程才能够执行后续的代码,否则将会在此进行堵塞,并将信号量递减;V操作(signal)是指对信号量进行递增,然后执行后续的代码。

        在单生产者与单消费者模式中,可以定义empty和full两个信号量来表示容器中当前的状态。在生产者中,如果empty的位置小于0的时候(即容器已经满了),生产者线程就在此进行等待,直到容器中有有剩余的位置可用于存放物品。在消费者中,如果当前full的位置小于等于0的时候(即容器为空的时候),消费者线程就在此等待,直到容器中有可消费的物品时。

        上面算法的实现方式如下面的代码所示:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>

using namespace std;

static const int numbers = 10;  // 容器能够承载产品的个数
static int idx = 0;       // 当前物体存放的索引
// 生产者与消费者模式
struct Item{
    int container[numbers];
    mutex _mutex;
    condition_variable full;
    condition_variable empty;
} item;
typedef struct Item Item;

void producer(Item* item, const int value){
    // 如果容器满了的话,就等待
    unique_lock<mutex> lock(item->_mutex);
    while(idx >= numbers){
        cout << "producer is full" << endl;
        (item->full).wait(lock);
    }
    cout << "producer: " << value << endl;
    item->container[idx++] = value;
    (item->empty).notify_all();
}

void consumer(Item* item){
    // 如果容器为空的话,就进行等待
    unique_lock<mutex> lock(item->_mutex);
    while(idx < 0){
        cout << "consumer is empty" << endl;
        (item->empty).wait(lock);
    }
    cout << "consumer: " << item->container[--idx] << endl;
    (item->full).notify_all();
}

// 进行生产与消费
void run_producer(){
    while(1){
        sleep(1);
        producer(&item, 1);
    }
}
void run_consumer(){
    for(int i = 0; i < 5; ++i){
        sleep(1);
        consumer(&item);
    }
}

int main(){
    thread thread_producer(run_producer);
    thread thread_consumer(run_consumer);
    thread_producer.join();
    thread_consumer.join();
    return 0;
}

        在实际应用过程中,上述代码运行的结果如下图所示,不难发现在代码执行的过程中,生产者与消费者能够较好的对临界资源做到互斥访问,避免容器为空消费者对其进行消费,或者容器已满,生产者依旧对其进行生产的现象出现。

 下面是上述代码需要补充的知识点:

1.1 sleep()函数

        sleep()函数是c语言中用于对线程休眠的函数,传入参数以秒作为单位。在使用过程中需要导入<unistd.h>头文件。

1.2 lock_guard和unique_lock的区别

        lock_guard:lock_guard对象在创建的时候实现对锁进行上锁,并在该对象销毁的时候将所进行开锁。这相比于直接对mutex对象进行lock和unlock要方便很多,减轻了程序员维护多线程的压力。

        unique_lock:unique_lock比lock_guard要灵活很多,该对象可以控制上锁与解锁的时机,并且还允许线程进行等待。unique_lock经常配合condition_variable信号量对锁进行等待(wait()函数)以及释放锁(notify_all()函数)等操作。

2 单生产者多消费者模式

        有了单生产者单消费者模式的基础之后,单生产者多消费者模式只是在上述模式上的扩展。生产者与消费者的逻辑代码不变,只是在多个消费者访问临界区的时候需要加一个互斥量,从而保证只能有一个线程消费物品。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>

// 单生产者与多消费者模式
// 需要在单消费模式之上再添加一个mutex互斥量,只允许一个消费者在一个时间段内消费物品
using namespace std;

static const int numbers = 10;  // 容器能够承载产品的个数
static int idx = 0;       // 当前物体存放的索引
// 生产者与消费者模式
struct Item{
    int container[numbers];
    mutex _mutex;
    mutex _consumer_mutex;
    condition_variable full;
    condition_variable empty;
} item;
typedef struct Item Item;

void producer(Item* item, const int value){
    // 如果容器满了的话,就等待
    unique_lock<mutex> lock(item->_mutex);
    while(idx >= numbers){
        cout << "producer is full" << endl;
        (item->full).wait(lock);
    }
    // 上述的while函数可以改成下面的形式更好理解
    // 为了保持和b站视频提供的视频一致,后续的代码都以while的形式为准,但是这两种形式都没错
    //cout << "producer is full" << endl;
    //(item->full).wait(lock,[&]{
    //    if(idx >= numbers) 
    //        return false;
    //    return true; });
    //cout << "producer: " << value << "; id = " << this_thread::get_id()<< endl; 
    item->container[idx++] = value;
    (item->empty).notify_all();
}

void consumer(Item* item){
    // 如果容器为空的话,就进行等待
    unique_lock<mutex> lock(item->_mutex);
    while(idx < 0){
        cout << "consumer is empty!" << endl;
        (item->empty).wait(lock);
    }
    cout << "consumer: " << item->container[--idx] << "; id = " << this_thread::get_id()<<endl; 
    (item->full).notify_all();
}

// 进行生产与消费
void run_producer(){
    while(1){
        sleep(1);
        producer(&item, 1);
    }
}
void run_consumer(){
    for(int i = 0; i < 5; ++i){
        sleep(1);
        unique_lock<mutex> lock(item._consumer_mutex);
        consumer(&item);
    }
}

int main(){
    thread thread_producer(run_producer);
    thread thread_consumer1(run_consumer);
    thread thread_consumer2(run_consumer);
    thread thread_consumer3(run_consumer);
    thread_producer.join();
    thread_consumer1.join();
    thread_consumer2.join();
    thread_consumer3.join();
    return 0;
}

        下面是上述代码执行的结果:

3 多生产者单消费者模式

        多生产者单消费者模式其实就是在上一个单生产者多消费者模式上的拓展,只是需要在多个消费者处理进程中添加一个互斥量,保证在生产产品的时候只能有一个线程进行访问。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>

// 单生产者与多消费者模式
// 需要在单消费模式之上再添加一个mutex互斥量,只允许一个消费者在一个时间段内消费物品
using namespace std;

static const int numbers = 10;  // 容器能够承载产品的个数
static int idx = 0;       // 当前物体存放的索引
// 生产者与消费者模式
struct Item{
    int container[numbers];
    mutex _mutex;
    mutex _consumer_mutex;
    mutex _producer_mutex;
    condition_variable full;
    condition_variable empty;
} item;
typedef struct Item Item;

void producer(Item* item){
    // 如果容器满了的话,就等待
    unique_lock<mutex> lock(item->_mutex);
    while(idx >= numbers){
        cout << "producer is full" << endl;
        (item->full).wait(lock);
    }
    int value = rand() % 100;
    cout << "producer: " << value << "; id = " << this_thread::get_id()<< endl; 
    item->container[idx++] = value;
    (item->empty).notify_all();
}

void consumer(Item* item){
    // 如果容器为空的话,就进行等待
    unique_lock<mutex> lock(item->_mutex);
    while(idx < 0){
        cout << "consumer is empty!" << endl;
        (item->empty).wait(lock);
    }
    cout << "consumer: " << item->container[--idx] << "; id = " << this_thread::get_id()<<endl; 
    (item->full).notify_all();
}

// 进行生产与消费
void run_producer(){
    while(1){
        sleep(1);
        unique_lock<mutex> lock(item._producer_mutex);
        producer(&item);
    }
}
void run_consumer(){
    for(int i = 0; i < 5; ++i){
        sleep(1);
        //unique_lock<mutex> lock(item._consumer_mutex);
        consumer(&item);
    }
}

int main(){
    thread thread_producer1(run_producer);
    thread thread_producer2(run_producer);
    thread thread_producer3(run_producer);
    thread thread_consumer(run_consumer);
    thread_producer1.join();
    thread_producer2.join();
    thread_producer3.join();
    thread_consumer.join();
    return 0;
}

        下面就是代码运行的结果:

 4 多生产者多消费者模式

        多生产者多消费者模式其实就是上述三种模式的扩展。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>

// 单生产者与多消费者模式
// 需要在单消费模式之上再添加一个mutex互斥量,只允许一个消费者在一个时间段内消费物品
using namespace std;

static const int numbers = 10;  // 容器能够承载产品的个数
static int idx = 0;       // 当前物体存放的索引
// 生产者与消费者模式
struct Item{
    int container[numbers];
    mutex _mutex;
    mutex _consumer_mutex;
    mutex _producer_mutex;
    condition_variable full;
    condition_variable empty;
} item;
typedef struct Item Item;

void producer(Item* item){
    // 如果容器满了的话,就等待
    unique_lock<mutex> lock(item->_mutex);
    while(idx >= numbers){
        cout << "producer is full" << endl;
        (item->full).wait(lock);
    }
    int value = rand() % 100;
    cout << "producer: " << value << "; id = " << this_thread::get_id()<< endl; 
    item->container[idx++] = value;
    (item->empty).notify_all();
}

void consumer(Item* item){
    // 如果容器为空的话,就进行等待
    unique_lock<mutex> lock(item->_mutex);
    while(idx < 0){
        cout << "consumer is empty!" << endl;
        (item->empty).wait(lock);
    }
    cout << "consumer: " << item->container[--idx] << "; id = " << this_thread::get_id()<<endl; 
    (item->full).notify_all();
}

// 进行生产与消费
void run_producer(){
    while(1){
        sleep(1);
        unique_lock<mutex> lock(item._producer_mutex);
        producer(&item);
    }
}
void run_consumer(){
    for(int i = 0; i < 5; ++i){
        sleep(1);
        unique_lock<mutex> lock(item._consumer_mutex);
        consumer(&item);
    }
}

int main(){
    thread thread_producer1(run_producer);
    thread thread_producer2(run_producer);
    thread thread_producer3(run_producer);
    thread thread_consumer1(run_consumer);
    thread thread_consumer2(run_consumer);
    thread_producer1.join();
    thread_producer2.join();
    thread_producer3.join();
    thread_consumer1.join();
    thread_consumer2.join();
    return 0;
}

        下面就是最终的运行效果:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

设计模式-生产者与消费者模式 的相关文章

  • 解决git fatal:无法找到‘https‘的远程助手

    解决git fatal 无法找到 https 的远程助手 1 问题 今天使用git拉去代码的时候出现 fatal 无法找到 39 https 39 的远程助手错误 xff0c 如下所示 span class token function g
  • [Android Framework]Android 11系统Update-API时Lint检查问题解决和记录

    1 什么是Lint检查 Android Lint 是 ADT 16 xff08 和工具 16 xff09 中引入的一个新工具 xff0c 用于扫描 Android 项目源以查找潜在的错误 Android11之前 xff0c 我们在进行Fra
  • openEuler22.03LTS网卡配置

    VmWare完成安装openEuler xff0c 修改网卡配置文件 xff0c 重启network报错service not found xff0c 因为欧拉使用nmcli管理网络 按照centos7的经验 xff0c 修改ifcfg配置
  • 利用在线词典批量查询英语单词

    进来遇到很多英语生词 xff0c 工具书上给的解释错误百出 xff0c 而很多在线词典不但可以给出某个单词的解释 xff0c 而且有大量的示例 xff0c 因此猜想利用在线词典批量查询这些单词 怎么实现呢 xff1f 首要问题是如何自动获取
  • linux svn服务器搭建 centos 搭建svn服务器

    本文是在CentOS中采用yum安装方式 优点 xff1a 简单 xff0c 一键安装 xff0c 不用手动配置环境变量等 缺点 xff1a 安装位置为yum默认 xff0c 比如我们公司服务器上安装软件有自己的规定 xff0c 一般会采用
  • Firewall 防火墙常用命令

    Firewall开启常见端口命令 xff1a 注意 permanent意思是 永久生效 firewall cmd zone 61 public add port 61 80 tcp permanent firewall cmd zone 6
  • 第二章——keil5修改工程名字

    第一章 stm32f103建立工程 第二章 keil5修改工程名字 目录 1 修改模板文件名 2 修改工程文件名 3 删除中间文件 4 修改输出中间变量文件名 5 点击编译 xff0c 改名成功 1 修改模板文件名 把第一章建立的工程模板的
  • origin2021如何切换中文界面

    origin2021如何切换中文界面 一 直接设置Change Language二 Change Language菜单是灰色的 一 直接设置Change Language 1 单击 Help gt Change Language 2 将La
  • fbe 业务流程分析

    参考链接 xff1a https www cnblogs com bobfly1984 p 14090078 html 总结 根据 data unencrypted key和 data misc vold user keys de 0 路径
  • js的字符串匹配方法match()和Java的字符串匹配方法matches()的使用?以换行符替换为其他字符为例

    js的字符串匹配方法match 和Java的字符串匹配方法matches 的使用 xff1f 以换行符替换为其他字符为例 js的 xff1a str match n igm length会返回str中有多少个换行str match bc i
  • UNIX 环境高级编程

    与你共享 xff0c 与你共舞 xff01 UNIX环境高级编程 xff08 第3版 xff09 是被誉为UNIX编程 圣经 xff1b 书中除了介绍UNIX文件和目录 标准I O库 系统数据文件和信息 进程环境 进程控制 进程关系 信号
  • 华为服务器WebBios创建磁盘阵列

    步骤 1 启动服务器按ctrl 43 h进入WebBios 2 点击Start确定进入下一步 3 左栏的Configuration Wizard添加raid 4 选New Configuration新建raid即可 5 选中硬盘 然后再按N
  • goland 无法编译输出 Compilation finished with exit code 0

    golang编写程序无法输出
  • 分享关于AI的那些事儿

    机器人很厉害 给人治病的ibm 的Watson 沃森 击败世界围棋冠军的AlphaGo阿尔法狗 陪你聊天的机器人 数据标注 木马识别 恶意访问拦截 智能家居 但是17年首次出现了机器人获得国籍 这个机器人叫做索菲亚 这是一个类似人类的机器人
  • String Evolver, My First Genetic Algorithm

    When reading Evolutionary Computation for Modeling and Optimization 1 I found following problem in section 1 2 3 A strin
  • MongoDB特点及功能介绍

    一 MongoDB 介绍 1 基本概念 MongoDB是一个高性能 xff0c 开源 xff0c 无模式的文档型数据库 xff0c 是当前NoSQL数据库产品中最热门的一种 它在许多场景下可用于替代传统的关系型数据库或键 值存储方式 xff
  • 线程同步以及线程调度相关的方法

    wait xff1a 使一个线程处于等待 xff08 阻塞 xff09 状态 xff0c 并且释放所持有的对象的锁 xff1b sleep xff1a 使一个正在运行的线程处于睡眠状态 xff0c 是一个静态方法 xff0c 调用此方法要处
  • 智能医疗辅助诊断——调查与思考

    背景 为什么要做智能医疗 xff1f 优质医疗资源不足且增长缓慢各地方医疗资源分配不均客观条件满足 xff0c 人工智能技术发展 xff0c 算法 算力 数据齐备 目录 指出 xff0c 医用软件按照预期用途分为辅助诊断类和治疗类 诊断功能
  • WebMvcConfigurer配置HandlerInterceptor拦截器失效

    1 前言 Springboot2 0之前 xff0c 实现拦截器功能的配置类是通过继承 extends WebMvcConfigurerAdapter类完成的 xff0c 最近项目把Springboot升级到了Springboot2 X x
  • ubuntu deepin wechat中文乱码解决

    deepin wechat 中文乱码解决方案 方案一 执行以下命令打开文件 gedit opt deepinwine tools run sh 找到WINE CMD 修改为 WINE CMD span class token operato

随机推荐