c++17实现同步阻塞队列

2023-05-16

话不多说,上代码:

#pragma once

#include <condition_variable>
#include <deque>
#include <mutex>
#include <shared_mutex>

using namespace std;

template <typename T>
class BlockingQueue {
public:
    BlockingQueue(size_t size)
        : m_limitSize(size)
    {
    }
    BlockingQueue()
        : m_limitSize(5000)
    {
    }

    void put(const T& val)
    {
        std::unique_lock lock(m_dequeMtx);
        while (!m_shutDown && ((m_limitSize > 0 && m_deque.size() >= m_limitSize) || m_limitState)) {
            if (m_limitSize > 0 && m_deque.size() >= m_limitSize) {
                m_limitState = true;
            }
            m_notFullCond.wait(lock);
        }
        if (m_shutDown) {
            return;
        }
        m_deque.push_back(val);
        m_notEmptyCond.notify_all();
    }

    T take()
    {
        std::unique_lock lock(m_dequeMtx);
        while (!m_shutDown && m_deque.empty()) {
            m_notEmptyCond.wait(lock);
        }
        if (m_shutDown && m_deque.empty()) {
            return T();
        }
        const T front = m_deque.front();
        m_deque.pop_front();

        if (m_deque.size() < m_limitSize / 2) {
            m_notFullCond.notify_all();
            m_limitState = false;
        }
        return front;
    }

    size_t size() const
    {
        return m_deque.size();
    }

    void shutDown()
    {
        m_shutDown = true;
    }

private:
    mutable std::mutex m_dequeMtx;
    std::condition_variable m_notEmptyCond;
    std::condition_variable m_notFullCond;
    std::deque<T> m_deque;
    size_t m_limitSize { 0 };
    bool m_limitState { false };
    bool m_shutDown { false };
};

PS:

1. size=0为无界队列,否则为有界队列。

2. 有界队列满时,阻塞生产者,队列进入限制生产态,后来的生产者直接阻塞。队列消费至半大小时,解除限制态,唤醒所有生产者。

3.  关闭阻塞队列,禁止生产,等待消费者消费完队列剩余数据,为空后获取为null值,实现优雅结束。

测试代码:

#include "BlockingQueue.h"
#include "Demo.h"
#include <bits/stdc++.h>
#include <unistd.h>

using namespace std;

int main()
{
    BlockingQueue<int> deque(45);

    auto thread1 = thread([&deque]() {
        for (int i = 1; i < 100; i++) {
            deque.put(i);
            if (i % 10 == 0) {
                cout << "put" << endl;
            }
        }
    });
    auto thread2 = thread([&deque]() {
        for (int i = 100; i < 200; i++) {
            deque.put(i);
            if (i % 10 == 0) {
                cout << "put" << endl;
            }
        }
    });
    auto thread3 = thread([&deque]() {
        while (true) {
            int i = deque.take();
            if (i == 0) {
                break;
            }
            usleep(500 - i);
            cout << "take " << i << endl;
        }
    });
    thread1.join();
    thread2.join();
    deque.shutDown();
    thread3.join();

    return 0;
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

c++17实现同步阻塞队列 的相关文章

随机推荐