话不多说,上代码:
#pragma once
#include <condition_variable>
#include <deque>
#include <mutex>
#include <shared_mutex>
using namespace std;
template <typename T>
class BlockingQueue {
public:
BlockingQueue(size_t size)
: m_limitSize(size)
{
}
BlockingQueue()
: m_limitSize(5000)
{
}
void put(const T& val)
{
std::unique_lock lock(m_dequeMtx);
while (!m_shutDown && ((m_limitSize > 0 && m_deque.size() >= m_limitSize) || m_limitState)) {
if (m_limitSize > 0 && m_deque.size() >= m_limitSize) {
m_limitState = true;
}
m_notFullCond.wait(lock);
}
if (m_shutDown) {
return;
}
m_deque.push_back(val);
m_notEmptyCond.notify_all();
}
T take()
{
std::unique_lock lock(m_dequeMtx);
while (!m_shutDown && m_deque.empty()) {
m_notEmptyCond.wait(lock);
}
if (m_shutDown && m_deque.empty()) {
return T();
}
const T front = m_deque.front();
m_deque.pop_front();
if (m_deque.size() < m_limitSize / 2) {
m_notFullCond.notify_all();
m_limitState = false;
}
return front;
}
size_t size() const
{
return m_deque.size();
}
void shutDown()
{
m_shutDown = true;
}
private:
mutable std::mutex m_dequeMtx;
std::condition_variable m_notEmptyCond;
std::condition_variable m_notFullCond;
std::deque<T> m_deque;
size_t m_limitSize { 0 };
bool m_limitState { false };
bool m_shutDown { false };
};
PS:
1. size=0为无界队列,否则为有界队列。
2. 有界队列满时,阻塞生产者,队列进入限制生产态,后来的生产者直接阻塞。队列消费至半大小时,解除限制态,唤醒所有生产者。
3. 关闭阻塞队列,禁止生产,等待消费者消费完队列剩余数据,为空后获取为null值,实现优雅结束。
测试代码:
#include "BlockingQueue.h"
#include "Demo.h"
#include <bits/stdc++.h>
#include <unistd.h>
using namespace std;
int main()
{
BlockingQueue<int> deque(45);
auto thread1 = thread([&deque]() {
for (int i = 1; i < 100; i++) {
deque.put(i);
if (i % 10 == 0) {
cout << "put" << endl;
}
}
});
auto thread2 = thread([&deque]() {
for (int i = 100; i < 200; i++) {
deque.put(i);
if (i % 10 == 0) {
cout << "put" << endl;
}
}
});
auto thread3 = thread([&deque]() {
while (true) {
int i = deque.take();
if (i == 0) {
break;
}
usleep(500 - i);
cout << "take " << i << endl;
}
});
thread1.join();
thread2.join();
deque.shutDown();
thread3.join();
return 0;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)