生产者消费者模型(代码实现)

2023-05-16

一个场所: 队列

两个角色: 生产者、消费者

三种关系:
生产者-生产者:互斥
消费者-消费者:互斥
生产者-消费者:同步+互斥

如果生产者直接将数据交给消费,那么代码的耦合性将会非常强,而且如果生产数据特别快而消耗数据特别慢将会造成数据堆积,数据可能会丢失
因此利用 场所,生产者将数据放到场所中,消费者从场所中拿取数据

场所的作用:
解耦合
支持忙闲不均
支持并发
在这里插入图片描述

条件变量实现

/*==============================================================
*    Copyright . All rights reserved.
*    Filename:  pro_con.cpp
*    Author:    shen
*    Last modified: 2019-06-26 16:57
*    Description: 条件变量实现生产者消费者模型
==============================================================*/
#include <cstdio>
#include <queue>
#include <unistd.h>
#include <pthread.h>

#define MAX_QUEUE 5

class BlockQueue {
    public:
        BlockQueue() {
            pthread_mutex_init(&_mutex, NULL);
            pthread_cond_init(&_cond_pro, NULL);
            pthread_cond_init(&_cond_con, NULL);
        }
        ~BlockQueue() {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_cond_pro);
            pthread_cond_destroy(&_cond_con);
        }
        void Push(int data) {
            //加锁,队列满了则等待,生产数据,解锁
            pthread_mutex_lock(&_mutex);
            while (_queue.size() == MAX_QUEUE) {
                pthread_cond_wait(&_cond_pro, &_mutex);
            }
            _queue.push(data);
            pthread_cond_signal(&_cond_con);
            pthread_mutex_unlock(&_mutex);
        }
        //data 输出型参数,返回删除的数据
        void Pop(int *data) {
        	//加锁,队列为空则等待,消费数据,解锁
            pthread_mutex_lock(&_mutex);
            while (_queue.empty()) {
                pthread_cond_wait(&_cond_con, &_mutex);
            }
            *data = _queue.front();
            _queue.pop();
            pthread_cond_signal(&_cond_pro);
            pthread_mutex_unlock(&_mutex);
        }
        int Size() {
            return _queue.size();
        }
    private:
        std::queue<int> _queue; 
        pthread_mutex_t _mutex;
        pthread_cond_t _cond_pro;
        pthread_cond_t _cond_con;
};
//保证资源++是原子操作(多个生产者同时生产数据)
pthread_mutex_t mut; 
int resources = 0;
void* thr_pro(void* arg) {
    BlockQueue* q = (BlockQueue*)arg;

    while (1) {
        pthread_mutex_lock(&mut);
        q->Push(resources++);
        pthread_mutex_unlock(&mut);
        printf("   %lu product %04d %6d\n", pthread_self(), resources, q->Size());
        usleep(200000);
    }

    return NULL; 
}

void* thr_con(void* arg) {
    BlockQueue* q = (BlockQueue*)arg;

    while (1) {
        int data;
        q->Pop(&data);
        printf("%lu consume %04d %6d\n", pthread_self(), data, q->Size());
        usleep(200000);
    }

    return NULL;
}

int main() {
    BlockQueue* q = new BlockQueue;
    pthread_mutex_init(&mut, NULL);

    //四个消费者四个生产者
    pthread_t pro_tid[4], con_tid[4];
    for (int i = 0; i < 4; ++i) {
        int ret = pthread_create(&pro_tid[i], NULL, thr_pro, (void*)q);
        if (ret != 0) {
            perror("pthread_create error~~\n");
            return -1;
        }
        ret = pthread_create(&con_tid[i], NULL, thr_con, (void*)q);
        if (ret != 0) {
            perror("pthread_create error~~\n");
            return -1;
        }
    }

    for (int i = 0; i < 4; ++i) {
        pthread_join(pro_tid[i], NULL);
        pthread_join(con_tid[i], NULL);
    }
    pthread_mutex_destroy(&mut);

    return 0;
}

信号量实现
在这里插入图片描述

/*==============================================================
*    Copyright . All rights reserved.
*    Filename:  sem.cpp
*    Author:    shen
*    Last modified: 2019-06-26 16:57
*    Description: 信号量实现生产者消费者模型
==============================================================*/
#include <vector>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

#define MAX_QUEUE 5

class RingQueue {
    public:
        RingQueue(int cap = MAX_QUEUE)
            : _queue(5)
            , _front(0)
            , _rear(0)
            , _capacity(cap)
        {
            sem_init(&_sem_lock, 0, 1); //互斥锁只有 0 和 1
            sem_init(&_sem_data, 0, 0);
            sem_init(&_sem_space, 0, _capacity);
        }
        ~RingQueue() {
            sem_destroy(&_sem_lock);
            sem_destroy(&_sem_data);
            sem_destroy(&_sem_space);
        }
        void Push(int data) {
            sem_wait(&_sem_space);
            //先等待空闲空间,有空闲空间才能进行加锁并且对资源进行操作
            //如果先加锁,然后没有空间资源,陷入等待将程序卡死
            sem_wait(&_sem_lock);
            _queue[_front] = data;
            _front = (_front + 1) % _capacity;
            sem_post(&_sem_lock);
            sem_post(&_sem_data);
        }
        void Pop(int *data) {
            sem_wait(&_sem_data);
            sem_wait(&_sem_lock);
            *data = _queue[_rear];
            _rear = (_rear + 1) % _capacity;
            sem_post(&_sem_lock);
            sem_post(&_sem_space);
        }
        int Size() {
            int size = 0;
            int front = _front;
            while (front != _rear) {
                front = (front + 1) % _capacity;
                size++;
            }
            return size;
        }
    private:
        std::vector<int> _queue;
        int _front;
        int _rear;
        int _capacity;
        sem_t _sem_lock;
        sem_t _sem_data;
        sem_t _sem_space;
};

int resources = 0;
//这个锁确保同一时刻只有一个线程进行资源++操作
//因为一旦一个进程拿着数据进入到Push函数中,还没有加锁(意味着也没有进行资源++)
//时间片轮转到了另一个线程,这个线拿着同样的数据(因为刚刚的线程还没有进行++操作),此时这个线程进行了++操作
//这种情况下,同样数据被加载了两次,因此需要保证Push的操作原子性
sem_t sem_lock;

void* thr_pro(void* arg) {
    RingQueue* q = (RingQueue*)arg;

    while (1) {
        sem_wait(&sem_lock);
        q->Push(resources++);
        sem_post(&sem_lock);
        printf("   %lu product %04d %6d\n", pthread_self(), resources, q->Size());
        usleep(10000);
    }

    return NULL; 
};

void* thr_con(void* arg) {
    RingQueue* q = (RingQueue*)arg;

    while (1) {
        int data;
        q->Pop(&data);
        printf("%lu consume %04d %6d\n", pthread_self(), data, q->Size());
        usleep(10000);
    }

    return NULL;
}

int main() {
    RingQueue* q = new RingQueue;
    sem_init(&sem_lock, 0, 1);

    //四个消费者四个生产者
    pthread_t pro_tid[4], con_tid[4];
    for (int i = 0; i < 4; ++i) {
        int ret = pthread_create(&pro_tid[i], NULL, thr_pro, (void*)q);
        if (ret != 0) {
            perror("pthread_create error~~\n");
            return -1;
        }
        ret = pthread_create(&con_tid[i], NULL, thr_con, (void*)q);
        if (ret != 0) {
            perror("pthread_create error~~\n");
            return -1;
        }
    }

    for (int i = 0; i < 4; ++i) {
        pthread_join(pro_tid[i], NULL);
        pthread_join(con_tid[i], NULL);
    }
    sem_destroy(&sem_lock);

    return 0;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

生产者消费者模型(代码实现) 的相关文章

  • 有意思的Windows脚本(1)

    有意思的Windows脚本 1 因为不知道今天的博客写什么啦 xff0c 就放几个好玩的Windows脚本的源码吧 xff0c 大家千万不要干坏事情哦 xff0c 嘿嘿 1 vbs循环 xff08 桌面上建一个记事本 xff0c 输入下面代
  • 程序员3年5年10年三个阶段

    第一阶段 三年 三年对于程序员来说是第一个门槛 xff0c 这个阶段将会淘汰掉一批不适合写代码的人 这一阶段 xff0c 我们走出校园 xff0c 迈入社会 xff0c 成为一名程序员 xff0c 正式从书本上的内容迈向真正的企业级开发 我
  • 使用 matplotlib 轻松制作动画

    https www codenong com e264872efa062c7d6955 该链接讲了如何使用 matplotlib 轻松制作动画 xff0c 很好用
  • C#中使用IMemoryCache实现内存缓存

    1 缓存基础知识 缓存是实际工作中非常常用的一种提高性能的方法 缓存可以减少生成内容所需的工作 xff0c 从而显著提高应用程序的性能和可伸缩性 缓存最适用于不经常更改的数据 通过缓存 xff0c 可以比从原始数据源返回的数据的副本速度快得
  • 2021-09-13使用@Slf4j报错 程序包org.slf4j不存在

    导入两个maven依赖 然后就OK了 span class token tag span class token tag span class token punctuation lt span dependency span span c
  • PowerShell7.X的安装与美化

    参考链接1 xff1a https blog csdn net qq 39537898 article details 117411132参考链接2 xff1a https sspai com post 59380 很有参考价值 xff0c
  • Lab2 p3 围棋吃子的算法实现

    简单介绍下框架 xff1a 1 xff0e 声明一维数组block 作为一个临时变量记录一个块的大小 xff0c 声明一个整型blockLength记录这个块的长度 2 xff0e kill 为吃子的主函数 recersion int i
  • Python爬取皮皮虾视频

    背景 xff1a 今天闲着没事做 xff0c 然后想着刷刷视频 xff0c 然后发现前段时间学习了一下网络爬虫的一些基本应用 xff0c 就想着利用爬虫到网上去爬取一点视频来模拟人为的点击 下载操作 因为皮皮虾是手机端的app xff0c
  • 解决Result Maps collection already contains value for...BaseResultMap问题

    使用generatorSqlmapCustom逆向工程生成代码报错 假如使用generatorSqlmapCustom逆向工程生成代码 xff0c 即生成dao文件和mapper xml文件 xff0c 复制粘贴至工程中运行报错 Resul
  • IDEA2022.1的一些不常见问题解决方案

    文章目录 IDEA2022 1小问题解决方案 学习的时候尝鲜用了最新版本的IDEA 出现过以下老版本不会遇见的问题 Spring Initializer 创建的项目 无法新建module 显示Directory is already tak
  • 史上最全,Android P指纹应用及源码解析

    简单使用 源码分析 首先需要了解以下几点 指纹识别相关api是在Android23版本添加的 xff0c 所以过早版本的设备是无法使用的 xff1b android span class token punctuation span os
  • RNA-seq数据分析(HISAT2+featureCounts+StringTie)

    RNA seq数据分析 简介1 生物基础1 1 中心法则1 2 RNA seq Protocol1 3 RNA seq总的路线图 2 数据分析2 1 前期准备2 1 1 创建目录 amp 安装conda2 1 2 常用文件格式简介 2 2
  • Lottie动画的优劣及原理

    前言 Lottie是目前应用十分广泛的动画框架 在周会汇报的时候 xff0c 老板问能不能对Lottie进行优化 xff0c 于是就有了下文对Lottie原理的研究 毕竟要进行优化 xff0c 首先要深入了解原理嘛 Lottie实现 Lot
  • 详解微服务技术中进程间通信

    在单体应用中 xff0c 一个组件调用其它组组件时 xff0c 是通过语言级的方法或者函数调用 xff0c 而一个基于微服务的应用是运行于多个服务器上的分布式系统 xff0c 每个服务实例是一个典型的进程 所以 xff0c 如下图显示的 x
  • FusionCompute8.0.0实验(0)CNA及VRM安装(2280v2)

    给公司的华为泰山2280V2服务器安装CNA xff0c arm架构的 xff0c 采用方案为CNA和VRM在一个物理机上 准备文件 xff1a FusionCompute VRM 8 0 0 ARM 64 zip FusionComput
  • 网上买的st7789v3屏幕7脚的不能显示(1)

    今天通过网上购买了一款最便宜的1 3寸液晶显示屏分辨率240x240 xff0c 虽然小了一点 xff0c 但是看起来还不错 xff0c 于是准备了以前的用于驱动st7789的程序 xff0c 连接所有的引脚 xff0c 发现没有cs引脚
  • 新版idea中的terminal会打开windows的power shell窗口

    IDEA升级后发现点击terminal不会像之前一样显示在ide的底部而是会打开windows的Power Shell窗口 xff0c 此时需要找到windows Power Shell的位置右键属性在选项中 xff0c 取消勾选 使用旧版
  • 如何在非/home目录下下载安装vscode-server

    实现目标 xff1a 通过windows端的VSCODE xff0c 利用SSH工具在Ubuntu服务器的非 home目录下在下载安装vscode server 问题 xff1a 服务器 home文件夹剩余空间为0 xff0c 使用SSH工
  • Python 求解最大连通子网络问题

    记录一下不借助networkx包解决寻找最大连通子网络问题 这里没有源码 xff0c 只有问题解析 需要自己动手 这里是关键代码 xff1a span class token keyword for span i in span class
  • @Configuration的使用 和作用

    原文 从Spring3 0 xff0c 64 Configuration用于定义配置类 xff0c 可替换xml配置文件 xff0c 被注解的类内部包含有一个或多个被 64 Bean注解的方法 xff0c 这些方法将会被Annotation

随机推荐

  • @Component和@Configuration

    64 configuration和 64 component之间的区别是 xff1a 64 Component注解的范围最广 xff0c 所有类都可以注解 xff0c 但是 64 Configuration注解一般注解在这样的类上 xff1
  • zookeeper笔记

    ZooKeeper对分布式系统的协调 xff0c 使 共享存储解决分布式系统 临的问题 其实共享存储 xff0c 分布式应 也需要和存储进 络通信 大多数分布式系统中出现的问题 xff0c 都源于信息的共享出了问题 如果各个节点间信息不能及
  • Dubbo

    1 分布式架构 xff08 SOA 分层 按照业务性质分层 每一层要求简单 和 容易维护 应用层 距离用户最近的一层 也称之为接入层 使用tomcat 作为web容器 接收用户请求 使用下游的dubbo提供的接口来返回数据并且该层禁止访问数
  • Java的对象模型

    原文链接 对象在堆内存的布局分为三个区域 xff1a 分别是对象头 xff08 Header xff09 实例数据 xff08 Instance Data xff09 对齐填充 xff08 Padding xff09 对象头 xff1a 对
  • CopyOnWriterArrayList

    CopyOnWrite CopyOnWrite容器即写时复制的容器 通俗的理解是当我们往一个容器添加元素的时候 xff0c 不直接往当前容器添加 xff0c 而是先将当前容器进行Copy xff0c 复制出一个新的容器 xff0c 然后新的
  • Java 并发编程一篇 -(Synchronized 原理、LockSupport 原理、ReentrantLock 原理)

    并发编程已完结 xff0c 章节如下 xff1a Java 并发编程一篇 xff08 Synchronized 原理 LockSupport 原理 ReentrantLock 原理 xff09 Java 并发编程二篇 xff08 JMM C
  • Google离开我们已经快十年

    2010年1月13日 xff0c Google离开中国 掐指算来 xff0c Google已经离开我们快十年了 2010年是个特殊的年份 xff0c 这一年还发生了3Q大战 为什么诸多大事都发生在2010年 就是因为2010年是PC Web
  • Java 并发编程四篇 -(JUC、AQS 源码、ReentrantLock 源码)

    并发编程已完结 xff0c 章节如下 xff1a Java 并发编程一篇 xff08 Synchronized 原理 LockSupport 原理 ReentrantLock 原理 xff09 Java 并发编程二篇 xff08 JMM C
  • 在 IDEA 中的各种调试技巧,轻松定位 Bug(超级全面)

    原文地址
  • 20210721复盘

    1 在大厂中用的是java8吗 xff1f 还是更高版本的 xff1f 基本都是8 xff0c 一些老项目是7 2 远程面试的时候让写算法题 xff0c 是线上写还是用纸写还是用面试公司开发的工具写 xff1f 会有方法输入提示吗 xff1
  • 解决No plugin found for prefix ‘archetype‘ in the current project and in the plugin groups

    建立Maven项目时 xff0c 在cmd输入mvn archetype generate命令后出现了标题的报错 xff0c 我在settings xml里面配置的mirror是阿里云 xff0c 尝试了网上的好几个解决方案都不行 xff0
  • RocketMQ介绍和简单使用

    RocketMq下载安装 下载网址 xff1a http rocketmq apache org dowloading releases 系统要求64bit Linux Unix或Mac JDK版本 gt 61 1 8 解压后其中的目录文件
  • 如何在Oracle官网下载java的JDK最新版本和历史版本

    1 打开Oracle官网 xff0c 准备下载JDK 下载时需要使用注册用户登陆 xff0c 可以免费注册 地址 xff1a https developer oracle com 2 点击Downloads xff0c 并选择Java SE
  • 【Java】Collections集合类介绍

    Collections集合类介绍 Collections 是一个操作Set List和Map等集合的工具类 xff0c 提供了一系列静态方法对集合元素进行排序 查询和修改等操作 1 排序操作 xff08 1 xff09 reverse Li
  • AD将元器件由正面,放置到反面/元器件由反面放到正面

    按住选中元器件 xff0c 选中元器件之外的部分变灰 xff0c 选中元器件变成高亮 再按英文按键 l xff0c 实现元器件的正反面放置
  • CentOS6 yum命令报错YumRepo Error: All mirror URLs are not using ftp, http[s] or file解决

    原文地址 xff1a https www cnblogs com pistachio123 p 14301949 html 一 CentOS6 yum命令报错YumRepo Error All mirror URLs are not usi
  • Ubuntu 系统直接使用 root 用户登录实例

    Ubuntu 系统直接使用 root 用户登录实例 一般安装的Ubuntu 系统默认的不设置 root 帐户和密码 xff0c 这是为了安全做考虑 但是如果是本地虚拟机或者是作为测试使用 xff0c 直接使用root账户登录能减少很多麻烦
  • 深入浅出: 理解云原生基本原则

    云原生指的是一个敏捷的工程团队 xff0c 遵循敏捷的研发原则 xff0c 使用高度自动化的研发工具 xff0c 开发基于云基础设施和服务的应用以满足快速变化的客户需求 这些应用采用弹性 xff0c 可扩展和高可用的架构 这个工程团队通过高
  • java实现html转pdf

    1 需求 xff1a 将一个html页面转成pdf格式 2 方法 xff1a 在实现之前先考虑一个问题 xff0c pdf是前端生成还是后端生成 这里采用pdfbox 43 itext xff08 PDF文件名可自定义 xff09 技术在服
  • 生产者消费者模型(代码实现)

    一个场所 xff1a 队列 两个角色 xff1a 生产者 消费者 三种关系 xff1a 生产者 生产者 xff1a 互斥 消费者 消费者 xff1a 互斥 生产者 消费者 xff1a 同步 43 互斥 如果生产者直接将数据交给消费 xff0