文章目录
- 一、引入
- 二、生产者消费者模型
- 2.1 三者关系
- 2.2 生产者消费者模型基本原则
- 2.3 生产者消费者模型的好处
- 三、基于阻塞队列的生产者消费者模型
- 3.1 原理
- 3.2 代码实现
- 3.3 pthread_cond_wait的第二个参数
- 3.4 pthread_cond_wait伪唤醒
- 四、阻塞队列的应用
- 五、总结
一、引入
举个例子,比方说我们想买方便面,假如现在没有超市,我们只能去供货商那里买东西,我们要一件供货商生产一件。但是对于供货商来说成本太大了。所以现在有了超市这个媒介。
消费者和生产者通过超市间接进行交易。这样当生产者不需要的时候供货商可能还在生产,当供货商不生产的时候消费者还能买到。这样就把消费和消费进行解耦。我们把超市叫做缓冲区。
那么什么叫做解耦呢?我们举个反例:
当我们main调用函数的时候,main函数会生产数据交给函数,函数可以把数据暂时保存,而函数也消费了数据,符合生产者消费者模型。
但是当我们开始调用的时候main函数就什么也不干,在那里阻塞等待函数的返回,我们把main函数和调用函数之间的关系称为强耦合关系。
二、生产者消费者模型
首先要知道生产者消费者都要看到“超市”,所以“超市”是一块共享资源。而既然是共享资源就会涉及到多线程访问,那么这块共享资源就要被保护起来。
2.1 三者关系
生产者和生产者之间是互斥关系。
消费者和消费者之间是互斥关系。
生产者和消费者之间是互斥+同步。
这里的互斥是为了保证共享资源的安全性,同步是为了提高访问效率。
2.2 生产者消费者模型基本原则
我们只需要记住“321”原则:
3: 三种关系。
2: 两种角色,生产者线程、消费者线程。
1: 一个交易场所(特定结构的缓冲区)。
2.3 生产者消费者模型的好处
1️⃣ 把生产线程和消费线程进行解耦。
2️⃣ 支持消费和生产一段时间的忙闲不均问题。
3️⃣ 让消费者专注消费,生产者专注生产,提高效率。
但是这里不一定能保证高效。因为可能超市满了,那么生产者只能等待了,或者超市为空,消费者进行等待。
三、基于阻塞队列的生产者消费者模型
3.1 原理
当队列为空的时候,从队列中获取元素的线程将被阻塞,直到队列被放入元素。
当队列已满的时候,往队列放入元素的线程将被阻塞,直到有元素被取出。
3.2 代码实现
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
template <class T>
class BlockQueue
{
public:
BlockQueue(const int& maxcap = 5)
: _max(maxcap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_pcond, nullptr);
pthread_cond_init(&_ccond, nullptr);
}
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
if(_q.size() == _max)
{
pthread_cond_wait(&_pcond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_ccond);
pthread_mutex_unlock(&_mutex);
}
void pop(T *out)
{
pthread_mutex_lock(&_mutex);
if(_q.empty())
{
pthread_cond_wait(&_ccond, &_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_pcond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
std::queue<T> _q;
int _max;
pthread_mutex_t _mutex;
pthread_cond_t _pcond;
pthread_cond_t _ccond;
};
#include "BlockQueue.hpp"
using std::cout;
using std::endl;
void* consumer(void *_pbq)
{
BlockQueue<int> *pbq = static_cast<BlockQueue<int> *>(_pbq);
while(true)
{
int val;
pbq->pop(&val);
cout << "消费数据: " << val << endl;
}
}
void* productor(void *_pbq)
{
BlockQueue<int> *pbq = static_cast<BlockQueue<int> *>(_pbq);
while(true)
{
int val = rand() % 100 + 1;
pbq->push(val);
cout << "生产数据: " << val << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ (unsigned long)time(nullptr));
BlockQueue<int> *pbq = new BlockQueue<int>();
pthread_t con, pro;
pthread_create(&con, nullptr, consumer, pbq);
pthread_create(&pro, nullptr, productor, pbq);
pthread_join(con, nullptr);
pthread_join(pro, nullptr);
return 0;
}
当生产者生产的慢的时候,因为消费者一直在读取数据,会出现生产一个消费一个的情况。
当消费者慢的时候,生产者会先把阻塞队列填满,生产者开始等待,当消费者开始消费的时候,就会出现消费一个,生产一个的情况,消费者按顺序读取阻塞队列中的值。
3.3 pthread_cond_wait的第二个参数
这里的第二个参数必须是当前正在使用的互斥锁。
因为我们满了就会进行等待,如果像之前一样把锁拿走,那么其他线程就无法访问共享资源,也就是消费者无法拿到数据。
在pthread_cond_wait
调用的时候会自动把锁释放,并把自己挂起。
而被唤醒返回的时候会自动的重新获取传入的锁。
3.4 pthread_cond_wait伪唤醒
还有一种情况,我们只有一个消费线程,但有十个生产线程,而我们可能使用的是pthread_cond_broadcast
唤醒了一批线程。
所以这十个线程被唤醒了后就要直接全部push数据,这样就出现了问题。
所以这里不应该用if,应该用while,当被唤醒以后继续进行判断是否为满,消费者线程同理。
四、阻塞队列的应用
我们现在想写一个计算器小程序:
我们不仅可以往阻塞队列中放入数据,也可以放入任务(函数)。我们直接把任务传递给阻塞队列,然后就不用管了,让消费者拿到任务进行处理。
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
class Task
{
typedef std::function<int(int, int, char)> func_t;
public:
Task()
{}
Task(int x, int y, char op, func_t func)
: _x(x)
, _y(y)
, _op(op)
, _func(func)
{}
std::string operator()()
{
int res = _func(_x, _y, _op);
char buf[64];
snprintf(buf, sizeof buf, "%d %c %d = %d", _x, _op, _y, res);
return buf;
}
private:
int _x;
int _y;
char _op;
func_t _func;
};
接下来把阻塞队列也要修改一下:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
template <class T>
class BlockQueue
{
public:
BlockQueue(const int& maxcap = 5)
: _max(maxcap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_pcond, nullptr);
pthread_cond_init(&_ccond, nullptr);
}
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
while(_q.size() == _max)
{
pthread_cond_wait(&_pcond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_ccond);
pthread_mutex_unlock(&_mutex);
}
void pop(T *out)
{
pthread_mutex_lock(&_mutex);
while(_q.empty())
{
pthread_cond_wait(&_ccond, &_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_pcond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
std::queue<T> _q;
int _max;
pthread_mutex_t _mutex;
pthread_cond_t _pcond;
pthread_cond_t _ccond;
};
上面的任务模型里面有一个func_t的回调函数,这个函数就是进行数据计算的回调函数。
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <cstdio>
#include <unordered_map>
using std::cout;
using std::endl;
const std::string oper = "+-*/";
std::unordered_map<char, std::function<int(int, int)>> hash = {
{'+', [](int x, int y)->int{return x + y;}},
{'-', [](int x, int y)->int{return x - y;}},
{'*', [](int x, int y)->int{return x * y;}},
{'/', [](int x, int y)->int{
if(y == 0)
{
std::cerr << "除0错误" << endl;
return -1;
}
return x / y;}},
};
int myMath(int x, int y, char op)
{
int res = hash[op](x, y);
return res;
}
void* consumer(void *_pbq)
{
BlockQueue<Task> *pbq = static_cast<BlockQueue<Task> *>(_pbq);
while(true)
{
Task t;
pbq->pop(&t);
cout << "消费数据: " << t() << endl;
}
}
void* productor(void *_pbq)
{
BlockQueue<Task> *pbq = static_cast<BlockQueue<Task> *>(_pbq);
while(true)
{
sleep(1);
int x = rand() % 100 + 1;
int y = rand() % 10 + 1;
int operidx = rand() % oper.size();
char op = oper[operidx];
Task t(x, y, op, myMath);
pbq->push(t);
printf("生产数据: %d %c %d = ?\n", x, op, y);
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ (unsigned long)time(nullptr));
BlockQueue<Task> *pbq = new BlockQueue<Task>();
pthread_t con, pro;
pthread_create(&con, nullptr, consumer, pbq);
pthread_create(&pro, nullptr, productor, pbq);
pthread_join(con, nullptr);
pthread_join(pro, nullptr);
return 0;
}
五、总结
我们看这样一个模型:
因为阻塞队列是临界资源,每次只有一个线程能够进入,那么生产者消费者模型高效在哪里呢?
首先要思考生产者的数据从哪来?消费者拿完数据后需不需要时间执行?这些都是需要消耗时间的。例如当消费者1拿到数据后进行处理,此时消费者2就能去阻塞队列里拿数据。
所以生产者消费者模型并不是高效在阻塞队列中,而是高效在生产之前和消费之后。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)