基于C++11实现的阻塞队列(BlockQueue)

2023-05-16

思路:

生产者消费者模型

 如图,多个生产者线程和多个消费者线程共享同一固定大小的缓冲区,它们的生产和消费符合以下规则:

  • 生产者不会在缓冲区满的时候继续向缓冲区放入数据,而消费者也不会在缓冲区空的时候,消耗数据
  • 当缓冲区满的时候,生产者会进入阻塞状态,当下次消费者开始消耗缓冲区的数据时,生产者才会被唤醒,开始往缓冲区中添加数据;当缓冲区空的时候,消费者也会进入阻塞状态,直到生产者往缓冲区中添加数据时才会被唤醒

所以重点为实现一个线程安全的缓冲队列:

1、首先使用STL库中的deque作为基本容器;

2、然后定义一个固定大小的容量capacity,防止无限制扩容下去导致内存耗尽;

3、之后需要两个条件变量Productor和Consumer,模拟生产者和消费者;

4、条件变量必须得有互斥量辅助,所以还需一个互斥量mutex。 

template<class T>
class BlockQueue {
public:
    explicit BlockQueue(size_t maxCapacity = 1000);
    ~BlockQueue();
private:
    std::deque<T> deq;
    size_t capacity;
    std::mutex mtx;
    std::condition_variable Consumer;
    std::condition_variable Producer;
};

主要成员函数介绍:

1、析构函数:清理队列中的所有成员,唤醒所有阻塞中的生产者、消费者线程。

template<class T>
BlockQueue<T>::~BlockQueue()
{
    {
        std::lock_guard<std::mutex> locker(mtx);
        deq.clear();
    }
    Producer.notify_all();
    Consumer.notify_all();
};

2、生产者线程应该调用的函数

(1)void BlockQueue<T>::push_back(const T& item)

template<class T>
void BlockQueue<T>::push_back(const T& item)
{
    //使用条件变量前应先对互斥量上锁,此时选择unique_lock而非lock_guard
    //因为unique_lock在上锁后允许临时解锁再加锁,而lock_guard上锁后只能在离开作用域时解锁
    std::unique_lock<std::mutex> lock(mtx);
    while (deq.size() >= capacity)
    {
        //若队列已满,则生产者线程进入阻塞状态,自动对互斥量解锁,被唤醒后又自动对互斥量加锁
        Producer.wait(lock);
    }
    deq.push_back(item);//向队列中放入数据
    Consumer.notify_one();//唤醒一个阻塞的消费者线程
}

 (2)void BlockQueue<T>::push_front(const T& item)

template<class T>
void BlockQueue<T>::push_front(const T& item)
{
    std::unique_lock<std::mutex> lock(mtx);
    while (deq.size() >= capacity)
    {
        Producer.wait(lock);
    }
    deq.push_front(item);
    Consumer.notify_one();
}

3、消费者线程应该调用的函数

(1)bool BlockQueue<T>::pop(T& item)

因为要实现线程安全的函数,所以将pop函数的接口设计为需要放入一个自己的T& item作为删除数据的拷贝。 

template<class T>
bool BlockQueue<T>::pop(T& item)
{
    std::unique_lock<std::mutex> lock(mtx);
    while (deq.empty())
    {
        //若队列空,则消费者线程进入阻塞,等待被唤醒
        Consumer.wait(lock);
    }
    item = deq.front();//当作删除数据的拷贝
    deq.pop_front();
    Producer.notify_one();//唤醒一个阻塞的生产者线程
    return true;
}

(2)bool BlockQueue<T>::pop(T& item,int timeout)

为pop()函数添加第二参数timeout,设计为带计时器的pop函数。

若队列在超时时间后仍一直为空,则立刻返回false。

template<class T>
bool BlockQueue<T>::pop(T& item,int timeout)
{
    std::unique_lock<std::mutex> lock(mtx);
    while (deq.empty())
    {
        //阻塞时间超过指定的timeout,直接返回false
        if (Consumer.wait_for(lock, std::chrono::seconds(timeout)) == std::cv_status::timeout)
            return false;
    }
    item = deq.front();
    deq.pop_front();
    Producer.notify_one();
    return true;
}

 4、一些额外的成员函数

(1)判断队列为空 

template<class T>
bool BlockQueue<T>::empty()
{
    std::lock_guard<std::mutex> lock(mtx);
    return deq.empty();
}

(2)判断队列为满

template<class T>
bool BlockQueue<T>::full()
{
    std::lock_guard<std::mutex> lock(mtx);
    return deq.size()>=capacity;
}

(3)模拟队列的front函数

template<class T>
T BlockQueue<T>::front()
{
    std::lock_guard<std::mutex> lock(mtx);
    return deq.front();
}

(4)模拟队列的back函数

template<class T>
T BlockQueue<T>::back()
{
    std::lock_guard<std::mutex> lock(mtx);
    return deq.back();
}

 

源码: 

 blockqueue.h

#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H

#include <mutex>
#include <deque>
#include <condition_variable>
#include <assert.h>
#include <chrono>

template<class T>
class BlockQueue {
public:
    explicit BlockQueue(size_t maxCapacity = 1024);

    ~BlockQueue();

    void close();

    void flush();

    void clear();

    T front();

    T back();

    size_t get_size();

    size_t get_capacity();

    void push_back(const T& item);

    void push_front(const T& item);

    bool empty();

    bool full();

    bool pop(T& item);

    bool pop(T& item, int timeout);

private:
    std::deque<T> deq;

    size_t capacity;

    std::mutex mtx;

    bool isClose;

    std::condition_variable Consumer;

    std::condition_variable Producer;
};


template<class T>
BlockQueue<T>::BlockQueue(size_t maxCapacity):capacity(maxCapacity) 
{
    assert(maxCapacity > 0);
    isClose = false;
}

template<class T>
BlockQueue<T>::~BlockQueue()
{
    close();
};

template<class T>
void BlockQueue<T>::close()
{
    {
        std::lock_guard<std::mutex> locker(mtx);
        deq.clear();
        isClose= true;
    }
    Producer.notify_all();
    Consumer.notify_all();
};

template<class T>
void BlockQueue<T>::flush()
{
    Consumer.notify_one();
}

template<class T>
void BlockQueue<T>::clear()
{
    std::lock_guard<std::mutex> lock(mtx);
    deq.clear();
}

template<class T>
T BlockQueue<T>::front()
{
    std::lock_guard<std::mutex> lock(mtx);
    return deq.front();
}

template<class T>
T BlockQueue<T>::back()
{
    std::lock_guard<std::mutex> lock(mtx);
    return deq.back();
}

template<class T>
size_t BlockQueue<T>::get_size()
{
    std::lock_guard<std::mutex> lock(mtx);
    return deq.size();
}

template<class T>
size_t BlockQueue<T>::get_capacity()
{
    std::lock_guard<std::mutex> lock(mtx);
    return capacity;
}

template<class T>
void BlockQueue<T>::push_back(const T& item)
{
    std::unique_lock<std::mutex> lock(mtx);
    while (deq.size() >= capacity)
    {
        Producer.wait(lock);
    }
    deq.push_back(item);
    Consumer.notify_one();
}

template<class T>
void BlockQueue<T>::push_front(const T& item)
{
    std::unique_lock<std::mutex> lock(mtx);
    while (deq.size() >= capacity)
    {
        Producer.wait(lock);
    }
    deq.push_front(item);
    Consumer.notify_one();
}

template<class T>
bool BlockQueue<T>::empty()
{
    std::lock_guard<std::mutex> lock(mtx);
    return deq.empty();
}

template<class T>
bool BlockQueue<T>::full()
{
    std::lock_guard<std::mutex> lock(mtx);
    return deq.size()>=capacity;
}

template<class T>
bool BlockQueue<T>::pop(T& item)
{
    std::unique_lock<std::mutex> lock(mtx);
    while (deq.empty())
    {
        Consumer.wait(lock);
        if (isClose)
            return false;
    }
    item = deq.front();
    deq.pop_front();
    Producer.notify_one();
    return true;
}

template<class T>
bool BlockQueue<T>::pop(T& item,int timeout)
{
    std::unique_lock<std::mutex> lock(mtx);
    while (deq.empty())
    {
        if (Consumer.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout)
            return false;
        if (isClose)
            return false;
    }
    item = deq.front();
    deq.pop_front();
    Producer.notify_one();
    return true;
}

#endif // BLOCKQUEUE_H
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于C++11实现的阻塞队列(BlockQueue) 的相关文章

随机推荐

  • spring详解

    Spring 详解 什么是spring Spring是一个开源框架 xff0c 它由Rod Johnson创建 它是为了解决企业应用开发的复杂性而创建的 Spring使用基本的JavaBean来完成以前只可能由EJB完成的事情 然而 xff
  • 谁是你生命中的贵人

    谁是你生命中的贵人 谨以此文献给我一生至爱 作者 xff1a 成晓旭 版权保留 转载请保持文章完整性 最近很奇怪的发现 xff0c 生命中的人和事 xff0c 常常不是均匀而规律地分布在人生的时间轴上 xff0c 或许很长时间都平淡无奇 x
  • Android开发之使用BroadcastReceiver实时监听电量(源码分享)

    Android系统中实时的监听手机电量以及开机启动功能都是通过BroadcastReceiver组件实现的 我们能够动态注冊这个类的一个实例通过Context registerReceiver 方法或者静态注冊 xff0c 通过 lt Re
  • 爬取豆瓣网站前top250电影名称和评分保存到本地excel中(附效果图和代码)

    import requests from lxml import etree import xlwt headers 61 39 User Agent 39 39 Mozilla 5 0 Windows NT 10 0 Win64 x64
  • IDEA项目结构介绍

    文章目录 IDEA项目结构介绍一 创建project模块二 创建Module模块三 创建Package包四 创建class类五 代码测试 IDEA项目结构介绍 project xff08 项目 工程 xff09 module xff08 模
  • Uart串口通讯协议与环形队列(裸机/RTOS)

    MCU上使用的稳定Uart通讯协议 环形队列 协议的主要内容 xff1a 接收 xff1a 字节间超时判断 环形队列接收 非阻塞式接收整帧数据 接收查错 xff1b 发送 xff1a 未应答重发 span class token punct
  • 对Excel表格数据的导出

    一 xff1a 有Excel文件模板的导出 xff08 1 xff09 判断模板文件是否存在 1 先通过Server MapPath 将相对的路径转为实际的物理路径 2 然后在通过IO流判断文件是否存在 例 xff1a System IO
  • Linux命令

    基本操作 Linux关机 重启 关机 shutdown h now 重启 shutdown r now 查看系统 CPU信息 查看系统内核信息 uname a 查看系统内核版本 cat proc version 查看当前用户环境变量 env
  • AOP概念及作用详细解释

    今天学习spring的时候接触到一个新的知识以及概念AOP 老师当时讲这个知识点的时候 xff0c 并没有讲太多关于这方面的知识 xff0c 我们所学习到的知识侧重于应用 xff0c 所以对这一块的理论知识是有一点缺少 xff0c 以至于小
  • Permission denied解决办法之一

    最近在编一个小程序 xff0c 有一步需要我删除一个文件 xff0c 但当用remove 后返回值始终是 1 用了perror后提示 Permission denied xff0c 在搜索后总是找不到解决办法 xff0c 然后我感觉是需要删
  • 数脉科技:Windows server 2016的基础设置

    是否经常不会设置Windows server 2016这款服务器而苦恼呢 xff0c 现在不用怕了数脉科技一点一点耐心的来教会大家如何学会Windows server 2016的基础设置吧 xff01 一 组策略 WIN键 43 R键 xf
  • 2020-09-11

    三招你成为短视频大神 xff01 很多小伙伴对于自媒体比较眼红 xff0c 看着抖音快手上很多靠直播 靠短视频火起来的人都挣到钱了 xff0c 所以跃跃欲试 xff0c 也想在这里面分一杯羹 但是真的那么容易吗 xff1f 直播就不用说了
  • 2020-10-15

    短视频配音选材的方法 网络短视频内容花样众多 xff0c 在快手短视频或者抖音短视频上面总是能刷到很多新奇的东西 xff0c 其中给短视频配音也是一个模块 xff0c 根据剧情来即兴发挥或者编排台词都是很重要的一部分 给短视频配音又分好多种
  • 服务器处理能力,你估算正确过吗?

    服务器处理能力 xff0c 你估算正确过吗 xff1f 作者 xff1a 成晓旭 1 引题 但凡写过技术方案的都知道 xff0c 在技术方案最终落实到工程实施部署时 xff0c 必须编制出当前解决方案需要部署的IT设备及环境 xff0c 包
  • 2020-10-20

    抖音快手短剧本写作方法 看别人拍的视频拍的新鲜有趣 xff0c 但是自己就是拍不出那种效果 xff0c 其实抖音视频前期是非常好做的 xff0c 可能发一个PPT一样的几张靓照或者是逗猫咪的小视频 xff0c 就能轻松获赞数十万 但是有个词
  • 2020-11-02

    如何更好的制作短视频内容 xff1f 细节决定成败 xff0c 对抖音短视频尤其重要 xff0c 抖音本来就是以内容为主的短视频平台 一个短视频能火 xff0c 一个短视频不火 xff0c 相同的主题 xff0c 不同的细节 xff0c 它
  • 2020-11-24

    如何才能撰写出好的短视频文案 首先是要调起用户的共鸣 xff0c 比如以前抖音上很火的一个视频 视频拍的很简单 xff0c 就是坐在出租车上拍摄的一些 xff0c 呼啸而过的高楼大厦和车流 xff0c 以及灰色的天空和不断向后跑的树 视频和
  • 2020-11-26

    短剧本创作小技巧 xff0c 提升完播率 相信每一个编剧在自己的短剧本拍成视频时 xff0c 都会有满足感和成就感 但有的时候 xff0c 也会辗转难眠 xff0c 而其中遇到了最大问题就是完播率太低 而影响完播率的关键所在 xff0c 就
  • 2020-11-28

    短剧本中的场景选择 一般在写短剧本的时候 xff0c 一个故事里面会有很多的故事场景 xff0c 场景写的比较好就会使短剧本在转换为段视频的时候 xff0c 制作的短视频会更加的好看 故事的场景也算得上是一个故事的主要开端 xff0c 开端
  • 基于C++11实现的阻塞队列(BlockQueue)

    思路 xff1a 生产者消费者模型 如图 xff0c 多个生产者线程和多个消费者线程共享同一固定大小的缓冲区 xff0c 它们的生产和消费符合以下规则 xff1a 生产者不会在缓冲区满的时候继续向缓冲区放入数据 xff0c 而消费者也不会在