【linux】基于阻塞队列的生产者消费者模型(条件变量)

2023-05-16

文章目录

  • 一、引入
  • 二、生产者消费者模型
    • 2.1 三者关系
    • 2.2 生产者消费者模型基本原则
    • 2.3 生产者消费者模型的好处
  • 三、基于阻塞队列的生产者消费者模型
    • 3.1 原理
    • 3.2 代码实现
    • 3.3 pthread_cond_wait的第二个参数
    • 3.4 pthread_cond_wait伪唤醒
  • 四、阻塞队列的应用
  • 五、总结

一、引入

举个例子,比方说我们想买方便面,假如现在没有超市,我们只能去供货商那里买东西,我们要一件供货商生产一件。但是对于供货商来说成本太大了。所以现在有了超市这个媒介。
在这里插入图片描述

消费者和生产者通过超市间接进行交易。这样当生产者不需要的时候供货商可能还在生产,当供货商不生产的时候消费者还能买到。这样就把消费和消费进行解耦。我们把超市叫做缓冲区

那么什么叫做解耦呢?我们举个反例:

当我们main调用函数的时候,main函数会生产数据交给函数,函数可以把数据暂时保存,而函数也消费了数据,符合生产者消费者模型。
但是当我们开始调用的时候main函数就什么也不干,在那里阻塞等待函数的返回,我们把main函数和调用函数之间的关系称为强耦合关系

二、生产者消费者模型

首先要知道生产者消费者都要看到“超市”,所以“超市”是一块共享资源。而既然是共享资源就会涉及到多线程访问,那么这块共享资源就要被保护起来。

2.1 三者关系

生产者和生产者之间是互斥关系
消费者和消费者之间是互斥关系
生产者和消费者之间是互斥+同步

这里的互斥是为了保证共享资源的安全性,同步是为了提高访问效率。

2.2 生产者消费者模型基本原则

我们只需要记住“321”原则:
3: 三种关系。
2: 两种角色,生产者线程、消费者线程。
1: 一个交易场所(特定结构的缓冲区)。

2.3 生产者消费者模型的好处

1️⃣ 把生产线程和消费线程进行解耦。
2️⃣ 支持消费和生产一段时间的忙闲不均问题。
3️⃣ 让消费者专注消费,生产者专注生产,提高效率。

但是这里不一定能保证高效。因为可能超市满了,那么生产者只能等待了,或者超市为空,消费者进行等待。

三、基于阻塞队列的生产者消费者模型

3.1 原理

在这里插入图片描述
当队列为空的时候,从队列中获取元素的线程将被阻塞,直到队列被放入元素。
当队列已满的时候,往队列放入元素的线程将被阻塞,直到有元素被取出。

3.2 代码实现

// BlockQueue.hpp
#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

template <class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap = 5)
        : _max(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }

    void push(const T &in)
    {
        // 保护安全
        pthread_mutex_lock(&_mutex);

        if(_q.size() == _max)
        {
            // 如果满了就等待
            pthread_cond_wait(&_pcond, &_mutex);
        }
        _q.push(in);
        // 有数据了,唤醒消费者线程
        pthread_cond_signal(&_ccond);

        pthread_mutex_unlock(&_mutex);
    }

    void pop(T *out)
    {
        pthread_mutex_lock(&_mutex);
        
        if(_q.empty())
        {
            // 如果空了就等待
            pthread_cond_wait(&_ccond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        // 有空位置,唤醒生产者线程
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    std::queue<T> _q;
    int _max;// 元素上限
    pthread_mutex_t _mutex;// 保护共享资源
    pthread_cond_t _pcond;// 生产者条件变量
    pthread_cond_t _ccond;// 消费者条件变量
};

// Main.cc
#include "BlockQueue.hpp"

using std::cout;
using std::endl;

// 消费者
void* consumer(void *_pbq)
{
    BlockQueue<int> *pbq = static_cast<BlockQueue<int> *>(_pbq);
    while(true)
    {
        int val;
        pbq->pop(&val);
        cout << "消费数据: " << val << endl;
    }
}
// 生产者
void* productor(void *_pbq)
{
    BlockQueue<int> *pbq = static_cast<BlockQueue<int> *>(_pbq);
    while(true)
    {
        int val = rand() % 100 + 1;
        pbq->push(val);
        cout << "生产数据: " << val << endl;
    }
}

int main()
{
    srand((unsigned long)time(nullptr) ^ (unsigned long)time(nullptr));
    BlockQueue<int> *pbq = new BlockQueue<int>();
    pthread_t con, pro;
    pthread_create(&con, nullptr, consumer, pbq);
    pthread_create(&pro, nullptr, productor, pbq);

    pthread_join(con, nullptr);
    pthread_join(pro, nullptr);
    return 0;
}

当生产者生产的慢的时候,因为消费者一直在读取数据,会出现生产一个消费一个的情况。
在这里插入图片描述
当消费者慢的时候,生产者会先把阻塞队列填满,生产者开始等待,当消费者开始消费的时候,就会出现消费一个,生产一个的情况,消费者按顺序读取阻塞队列中的值。
在这里插入图片描述

3.3 pthread_cond_wait的第二个参数

在这里插入图片描述
这里的第二个参数必须是当前正在使用的互斥锁
因为我们满了就会进行等待,如果像之前一样把锁拿走,那么其他线程就无法访问共享资源,也就是消费者无法拿到数据。
pthread_cond_wait调用的时候会自动把锁释放,并把自己挂起。
而被唤醒返回的时候会自动的重新获取传入的锁

3.4 pthread_cond_wait伪唤醒

还有一种情况,我们只有一个消费线程,但有十个生产线程,而我们可能使用的是pthread_cond_broadcast唤醒了一批线程。
在这里插入图片描述
所以这十个线程被唤醒了后就要直接全部push数据,这样就出现了问题
所以这里不应该用if,应该用while,当被唤醒以后继续进行判断是否为满,消费者线程同理。
在这里插入图片描述

四、阻塞队列的应用

我们现在想写一个计算器小程序:

在这里插入图片描述
我们不仅可以往阻塞队列中放入数据,也可以放入任务(函数)。我们直接把任务传递给阻塞队列,然后就不用管了,让消费者拿到任务进行处理。

// Task.hpp
#pragma once

#include <iostream>
#include <string>
#include <functional>
#include <cstdio>

class Task
{
    typedef std::function<int(int, int, char)> func_t;
public:
    Task()
    {}

    Task(int x, int y, char op, func_t func)
        : _x(x)
        , _y(y)
        , _op(op)
        , _func(func)
    {}

    std::string operator()()
    {
        int res = _func(_x, _y, _op);
        char buf[64];
        snprintf(buf, sizeof buf, "%d %c %d = %d", _x, _op, _y, res);
        return buf;
    }
private:
    int _x;
    int _y;
    char _op; 
    func_t _func;
};

接下来把阻塞队列也要修改一下:

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>


template <class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap = 5)
        : _max(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }

    void push(const T &in)
    {
        // 保护安全
        pthread_mutex_lock(&_mutex);

        while(_q.size() == _max)
        {
            // 如果满了就等待
            pthread_cond_wait(&_pcond, &_mutex);
        }
        _q.push(in);
        // 有数据了,唤醒消费者线程
        pthread_cond_signal(&_ccond);

        pthread_mutex_unlock(&_mutex);
    }

    void pop(T *out)
    {
        pthread_mutex_lock(&_mutex);
        
        while(_q.empty())
        {
            // 如果空了就等待
            pthread_cond_wait(&_ccond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        // 有空位置,唤醒生产者线程
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    std::queue<T> _q;
    int _max;// 元素上限
    pthread_mutex_t _mutex;// 保护共享资源
    pthread_cond_t _pcond;// 生产者条件变量
    pthread_cond_t _ccond;// 消费者条件变量
};

上面的任务模型里面有一个func_t的回调函数,这个函数就是进行数据计算的回调函数。

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <cstdio>
#include <unordered_map>

using std::cout;
using std::endl;

const std::string oper = "+-*/";

std::unordered_map<char, std::function<int(int, int)>> hash = {
        {'+', [](int x, int y)->int{return x + y;}},
        {'-', [](int x, int y)->int{return x - y;}},
        {'*', [](int x, int y)->int{return x * y;}},
        {'/', [](int x, int y)->int{
            if(y == 0)
            {
                std::cerr << "除0错误" << endl;
                return -1;
            }
            return x / y;}},
    };

int myMath(int x, int y, char op)
{
    int res = hash[op](x, y);
    return res;
}

// 消费者
void* consumer(void *_pbq)
{
    BlockQueue<Task> *pbq = static_cast<BlockQueue<Task> *>(_pbq);
    while(true)
    {
        Task t;
        pbq->pop(&t);
        cout << "消费数据: " << t() << endl;
        //sleep(1);
    }
}
// 生产者
void* productor(void *_pbq)
{
    BlockQueue<Task> *pbq = static_cast<BlockQueue<Task> *>(_pbq);
    while(true)
    {
        sleep(1);
        int x = rand() % 100 + 1;
        int y = rand() % 10 + 1;
        int operidx = rand() % oper.size();
        char op = oper[operidx];
        Task t(x, y, op, myMath);
        pbq->push(t);
        printf("生产数据: %d %c %d = ?\n", x, op, y);
    }
}

int main()
{
    srand((unsigned long)time(nullptr) ^ (unsigned long)time(nullptr));
    BlockQueue<Task> *pbq = new BlockQueue<Task>();
    pthread_t con, pro;
    pthread_create(&con, nullptr, consumer, pbq);
    pthread_create(&pro, nullptr, productor, pbq);

    pthread_join(con, nullptr);
    pthread_join(pro, nullptr);
    return 0;
}

五、总结

我们看这样一个模型:
在这里插入图片描述
因为阻塞队列是临界资源,每次只有一个线程能够进入,那么生产者消费者模型高效在哪里呢?
首先要思考生产者的数据从哪来?消费者拿完数据后需不需要时间执行?这些都是需要消耗时间的。例如当消费者1拿到数据后进行处理,此时消费者2就能去阻塞队列里拿数据。
所以生产者消费者模型并不是高效在阻塞队列中,而是高效在生产之前和消费之后



本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【linux】基于阻塞队列的生产者消费者模型(条件变量) 的相关文章

  • 2021-09-12

  • 2021-10-14

  • 【无标题】

    学生成绩管理
  • 【无标题】

    零售业管理
  • Android中ScrollView使用详解

    滚动视图 xff08 ScrollView xff09 是指当拥有很多内容 xff0c 屏幕显示不完时 xff0c 需要通过滚动来显示完整的视图 包括水平滚动视图 xff08 HorizontalScrollView xff09 和垂直滚动
  • 【JAVA】快速排序

    快排 xff0c 和冒泡排序一样 xff0c 是同一类型的排序 xff0c 都是交换排序 交换 xff0c 涉及在遍历中比较 xff0c 然后相互交换元素 冒泡排序是根据趟数两两比较 xff0c 边比较边交换 xff0c 快排也一样 xff
  • 二叉树C语言构建及功能实现

    关于二叉树 xff0c 出现的印象一般就是一个 丫 字 xff0c 这看似没有毛病 xff0c 但其实也真没有什么毛病 xff0c 逻辑上的二叉树差不多就是这个样子 可是当我们已经在逻辑上构建好了二叉树之后 xff0c 真的不想去动手实现实
  • Spring项目的创建和使用

    目录 Spring项目的创建和使用 1 创建Spring项目 1 1基于maven创建项目1 2 在maven项目中添加Spring核心包 1 3 创建一个启动类 2 将对象存储到Spring中 2 1创建一个业务对象 2 2将业务对象储存
  • 二叉树创建的两种方法(图解)

    目录 一 括号表示法 xff08 1 xff09 括号表示法构建二叉树的算法思路及算法实现 xff08 2 xff09 图解括号表示法构建二叉树 xff08 3 xff09 测试程序 二 扩展二叉树 xff08 1 xff09 扩展二叉树构
  • 【嵌入式Linux】手机连接Linux系统的adb操作

    目录 手机连接Linux系统的adb操作过程adb控制指令测试 手机连接Linux系统的adb操作过程 通过了解 xff0c 安卓的底层也是Linux xff0c 这样我们就可以通过Linux控制安卓手机完成一系列指令 xff0c 在这里需
  • GUI(图形用户界面)之事件处理

    Swing组件中的事件处理专门用于响应用户的操作 xff0c 例如 xff0c 响应用户的鼠标单击 按下键盘等操作 在Swing事件处理的过程中 xff0c 主要涉及三类对象 xff1a 1 事件源 xff1a 事件发生的场所 xff0c
  • 实验3:生产者消费者问题实践(编程实验)

    第1关 xff1a 生产者消费者问题实践 任务要求 xff1a 生产者 消费者之间设置一个具有n个缓存区的缓冲池 xff0c 生产者进程将他所生产的产品放入一个缓冲 xff1b 消费者进程可以从一个缓冲区中取走产品去消费 老板不允许消费者进
  • Arrays常用方法(超详解)

    博客网站地址 http xiaohe blog top 目录 导包 1 Arrays toString 方法 2 Arrays sort 方法 3 Arrays equals 方法 4 Arrays binarySearch 5 Array
  • Maven,MyBatis详解

    本章是博主对maven和mybatis的个人理解 xff0c 希望对大家有帮助 文章目录 文章目录 一 Maven简介 xff1a 二 MyBatis介绍 xff1a 总结 xff1a 一 Maven简介 xff1a 1 Maven是专门用
  • maven配置阿里镜像,解决IDEA配置maven恢复默认配置问题

    文章目录 1 三个IDEA配置说明2 拷贝与修改settings xml2 1 找到IDEA的mave配置文件settings xml位置2 2 拷贝IDEA的settings xml到 m2目录下2 3 打开settings xml xf
  • Android 获取ListView滚动距离方法

    今天想做个导航栏渐变的效果 xff0c 原来这种效果是通过监听ScrollView的滚动距离实现的 由于首页图片较多 xff0c 而且里面还嵌套了ListView xff0c 大家都知道ListView嵌套在ScrollView中是需要计算
  • Linux如何开启远程连接

    第一步 打开终端 第二步 安装 OpenSSH server 输入以下命令 sudo apt update sudo apt upgrade sudo apt install openssh server 第三步 启用并验证ssh服务正在运
  • 完全平方数(C语言)

    一个简单的完全平方数问题 include lt math h gt math库 include lt stdio h gt int main long int i x y z for i 61 1 i lt 100000 i 43 43 x
  • 十进制转化为八进制和十六进制

    用C语言可以轻松的把10进制转化为8进制或16进制 include lt stdio h gt int main void int m printf 34 输入你想要转化的十进制整数 34 scanf 34 d 34 amp m print
  • Python处理大量csv(Excel)文件将数据保留两位小数

    Python处理大量csv文件是十分方便的 xff0c Python有许多用于数据处理数据分析的库 用Excel处理csv文件很痛苦 xff0c 效率很低 xff0c 没法批量保留两位小数 在处理需要大量数据的机器学习和神经网络的csv文件

随机推荐

  • BMI(体重指数)计算C语言

    计算BMI需要计算BMI的公式 xff0c 需要与用户交互 用户要输入体重和身高 include lt stdio h gt int main void BMI计算公式 float weight float height float BMI
  • SQL插入数据

    插入完整的行简单的一种方法 这种语法很简单 xff0c 但是避免使用 依赖列的定义次序 xff0c 在结构变动后不安全 INSERT INTO Customers INTO关键字保证代码可移植 VALUES 100000006 与列的定义次
  • 用栈实现中缀表达式求值

    include lt stdio h gt include lt string h gt include lt stdlib h gt define MAX 100 define Error 0 define True 1 typedef
  • 循环打印九九乘法表

    下面为三种打印九九乘法表的方法 第一种是使用while循环 xff0c 第二种是使用for循环 xff0c 第三种是使用while和for循环 其实还有很多种实现方式 xff0c 但是核心是利用循环实现 while循环实现九九乘法表 i 6
  • Python猜单词小程序

    import random flag 61 39 Y 39 print 34 欢迎使用猜单词小程序 34 while flag 61 61 39 Y 39 and flag 61 39 N 39 WORDS 61 39 easy 39 39
  • 扑克牌Python实现

    扑克牌可以洗牌 扑克牌有花色和数字 四个人的版本 每个人13张牌 对牌可以实现自动排序 xff08 先按照花色 xff0c 再按照数字 xff09 import random def gen pocker n 34 34 34 生成初始扑克
  • linux下执行sh脚本,提示Command not found解决办法

    确保用户对文件有读写及执行权限 xff08 如果此时的权限是 dr xff0c 没有x xff0c 就说明没有执行的权限 xff09 xff1a chmod a 43 x xxx sh
  • Python猜单词小游戏(GUI代码实现)

    本代码实现了带有用户界面的猜单词小游戏 xff0c 使用类进行编程 xff0c 用到了tkinter random等库 主界面可以进行猜单词游戏 xff0c 包含提示功能 更新单词 提交 退出等功能 菜单界面可以实现导入词库 xff0c 显
  • 伪黑客Windows下常使用的小黑窗口cmd,以及DOS(磁盘操作系统)

    什么是DOS xff1f 简单的来说 xff0c 就是使用命令的方式来操作电脑磁盘的文件 xff08 现在基本上都被图形化界面操作的方式取代 xff09 xff0c 如增删改查等操作 xff0c 是比较久远的一种操作系统 xff08 感兴趣
  • 【归并排序】C++数据结构实现归并排序完整代码

    归并排序 C 43 43 数据结构实现归并排序完整代码 归并排序 xff08 Merging Sort xff09 定义 xff1a 把两个或者多个有序的序列合并为一个 递归调用方式实现方式实现代码 xff1a 一 归并排序函数入口 归并排
  • 【css提取数据】

    css提取数据 一 了解网页代码格式二 标签之间的关系1 引入库 三 css选择器1 了解解析对象 xff0c 导入相关模块2 标签选择器3 类选择器4 id选择器5 组合选择器6 伪类选择器7 属性提取器 总结 提取数据方法有很多 xff
  • ubuntu 18.04 安装pycharm社区版以及创建桌面快捷方式

    1 下载 Download PyCharm Python IDE for Professional Developers by JetBrains 2 解压提取到此处 3 安装 xff08 1 xff09 打开终端 xff0c 进入pych
  • docker各种报错解决

    目录 问题1 Get https registry 1 docker io v2 context deadline exceeded 解决方法 问题反思 问题2 Error response from daemon Get 34 https
  • org.slf4j.Logger无法输出日志的BUG

    场景 依赖 lt dependency gt lt groupId gt org apache zookeeper lt groupId gt lt artifactId gt zookeeper lt artifactId gt lt v
  • 【安全知识】——SSH的两种远程登录方法详解

    作者名 xff1a Demo不是emo 主页面链接 xff1a 主页传送门 博主简介 xff1a 一 个普通的大二学生 xff0c 在CSDN写博客主要是为了分享自己的学习历程 xff0c 学习方法 xff0c 总结的经验等等 xff0c
  • scanf()函数错误C4996解决办法(严重性 代码 说明 项目 文件 行 禁止显示状态)

    问题如下 xff1a 严重性 代码 说明 项目 文件 行 禁止显示状态 错误 C4996 scanf This function or variable may be unsafe Consider using scanf s instea
  • Uubuntu 更新内核出现的问题_libssl3

    电脑系统 xff1a ubuntu系统 ubuntu版本 xff1a 1804 内核版本 xff1a 5 17 15 内核从5 15 升级到5 17后 xff0c 1 xff0c 在安装N卡驱动的时候 xff0c error xff1a l
  • 总结Vue中index.html、main.js、App.vue、index.js之间关系以及Vue项目加载流程

    总结Vue中index html main js App vue index js之间关系以及Vue项目加载流程 文章目录 总结Vue中index html main js App vue index js之间关系以及Vue项目加载流程1
  • 安装Hisat2

    一 xff08 MobaXterm Personal xff09 安装aspera 首先进行预编译解压安装 xff1a mkdir Biosofts unzip hisat2 2 2 1 Linux x86 64 zip d Biosoft
  • 【linux】基于阻塞队列的生产者消费者模型(条件变量)

    文章目录 一 引入二 生产者消费者模型2 1 三者关系2 2 生产者消费者模型基本原则2 3 生产者消费者模型的好处 三 基于阻塞队列的生产者消费者模型3 1 原理3 2 代码实现3 3 pthread cond wait的第二个参数3 4