9 线程池编程
创建线程要花费昂贵的资源和时间,如果任务来了才创建线程那么响应时间会变长,而且一个进程能创建的线程数有限。为了避免这些问题,在程序启动的时候就创建若干线程来响应处理,它们被称为线程池,里面的线程叫工作线程。
9.1 概念
线程池是提前创建并维护多个线程,等待管理者分配任务的机制,避免短时间线程创建和销毁的代价,一般是IO密集型的场景使用。主要包括线程管理器、任务线程、消息队列。
举例:安检、银行柜台等
- 为什么使用线程池?
频繁创建和销毁线程浪费CPU资源
- 线程池是什么?
一堆线程放在一个池子里统一管理
9.2 构成
9.2.1 任务队列job_queue
构成 |
接口 |
处理函数 |
void *(*)(void*) |
参数 |
void *arg |
队列指针 |
struct job_queue* pnext |
9.2.2 工作线程worker
9.2.3 线程池thread_pool
构成 |
接口 |
初始化 |
threadpool_init() |
销毁 |
threadpool_destroy() |
添加任务 |
threadpool_add_job() |
9.3 流程
9.4 实例
9.4.1 C++实现多线程读写
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
#include <algorithm>
#include <condition_variable>
using namespace std;
int main() {
mutex m;
condition_variable cond;
vector<int> vec;
thread t([&]() {
for(int i = 0; i < 10; ++i) {
unique_lock<mutex> lock(m);
while(vec.size()<5) cond.wait(lock); //wait
cout << "sum:" << accumulate(vec.begin(),vec.end(),0) << endl;
vec.clear();
}
});
int n;
while(cin >> n){
lock_guard<mutex> lock(m);
vec.push_back(n);
if(vec.size()>=5) cond.notify_one(); //signal
}
}
[root@localhost 8]# ./a.out
1
2
3
4
5
sum:15
6
7
8
9
10
sum:40
^C
9.4.2 线程池(重要例子)
实例:
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <functional>
using namespace std;
class Threadpool{
vector<thread> threads;
queue<function<void()>> tasks;
condition_variable cond;
mutex m;
bool exited = false;
public:
Threadpool(int num){
for(int i = 0;i < num;++i){
thread t([this](){
while(true){
function<void()> func;
{
unique_lock<mutex> lock(m);
while(tasks.empty() && !exited){
cond.wait(lock);
}
if(exited) break;
func = tasks.front();
tasks.pop();
}
func();
}
});
threads.emplace_back(move(t));
}
}
~Threadpool(){
exited = true;
cond.notify_all(); //signal 通知所有
for(auto& thread:threads){
thread.join();
}
}
Threadpool(const Threadpool&) = default;
Threadpool& operator=(const Threadpool&) = default;
void AddTask(function<void()> func){
lock_guard<mutex> guard(m);
tasks.push(func);
cond.notify_one(); //signal
}
};
int main(){
auto test = [](){
this_thread::sleep_for(2s);
cout << this_thread::get_id() << ":do something..." << endl;
};
Threadpool pool(5);
for(int i = 0;i < 15;++i){
pool.AddTask(test);
}
this_thread::sleep_for(10s); //等待15个线程执行完毕
}
优化:
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <functional>
using namespace std;
class Threadpool {
vector<thread> threads;
queue<function<void()>> tasks;
condition_variable cond;
mutex m;
bool exited = false;
public:
Threadpool(int num) {
auto routine = [this]() {
while(true) {
unique_lock<mutex> lock(m);
while(tasks.empty() && !exited) {
cond.wait(lock);
}
if(exited) break;
auto func = tasks.front();
tasks.pop();
lock.unlock();
func();
}
};
for(int i = 0; i < num; ++i) {
// thread t(routine);
// threads.emplace_back(move(t));
// threads.emplace_back(thread(routine));
threads.emplace_back(thread{routine});
}
}
~Threadpool() {
exited = true;
cond.notify_all(); //signal 通知所有
for(auto& thread:threads) {
thread.join();
}
}
Threadpool(const Threadpool&) = default;
Threadpool& operator=(const Threadpool&) = default;
void AddTask(function<void()> func) {
lock_guard<mutex> guard(m);
tasks.push(func);
cond.notify_one(); //signal
}
};
int main() {
mutex m;
auto test = [&m]() {
this_thread::sleep_for(2s);
lock_guard<mutex> guard(m);
cout << this_thread::get_id() << ":do something..." << endl;
};
Threadpool pool(5);
for(int i = 0; i < 15; ++i) {
pool.AddTask(test);
}
this_thread::sleep_for(10s); //等待15个线程执行完毕
}
[root@localhost 8]# g++ threadpool.cpp -pthread
[root@localhost 8]# ./a.out
139998701491968:do something...
139998693099264:do something...
139998684706560:do something...
139998676313856:do something...
139998709884672:do something...
139998701491968:do something...
139998693099264:do something...
139998684706560:do something...
139998676313856:do something...
139998709884672:do something...
139998701491968:do something...
139998693099264:do something...
139998684706560:do something...
139998676313856:do something...
139998709884672:do something...