Thread Pool
本文基于经典的99行代码,稍加修改使其支持C++20,并增加了wait与join功能。
#pragma once
#include <thread>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <future>
#include <functional>
#include <stdexcept>
namespace ThreadPool {
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
->std::future<typename std::invoke_result<F, Args...>::type>;
~ThreadPool();
void wait();
void join();
private:
std::vector<std::thread> workers; //线程池
std::queue<std::function<void()>> tasks; //任务队列
std::mutex queue_mutex; //队列锁
std::condition_variable condition; //状态变量
std::atomic<bool> stop; //原子变量
std::vector<unsigned char> status;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop(false), status(threads, 0)
{
for (size_t i = 0; i < threads; ++i)
{
workers.emplace_back( //增加工作线程
[this, i]
{
for (;;) //线程死循环
{
std::function<void()> task; //函数对象
{
std::unique_lock<std::mutex> lock(this->queue_mutex);//加锁,确保每次只有一个线程执行该语句块
this->condition.wait(lock,
[this] {return this->stop || !this->tasks.empty(); }); //等待被唤醒,当线程池停止或者任务队列非空时
if (this->stop && this->tasks.empty())
return; //当线程停止且队列空后,线程结束
task = std::move(this->tasks.front()); //获取任务队列
this->tasks.pop();
//std::cout << "Thread " << i << " is working" << std::endl;
//std::cout << "The task size is " << this->tasks.size() << std::endl;
}
this->status[i] = true;
task(); //执行任务
this->status[i] = false;
}
}
);
}
}
// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::invoke_result<F, Args...>::type> //->运算符表示返回类型放在后方,返回类型为std::future类型,且由模板std::invoke_result<F, Args...>::type决定
{
using return_type = typename std::invoke_result<F, Args...>::type; //重命名返回类型
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);//构造一个函数对象智能指针,std::package_task可以包装一个异步返回的函数,std::forward可以进行完美转发。
std::future<return_type> res = task->get_future(); //res保存线程返回值
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one(); //唤醒一个线程
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all(); //唤醒所有线程
for (auto& worker : workers)
{
if (worker.joinable()) {
worker.join();
}
}
}
inline void ThreadPool::wait()
{
bool isRuning = true;
while (isRuning) {
isRuning = false;
for (auto& status : this->status) {
if (status) {
isRuning = true;
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
inline void ThreadPool::join()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (auto& worker : workers)
{
if (worker.joinable()) {
worker.join();
}
}
}
}
进行了一定的完善,支持在C++20中使用,增加了join与wait函数。
用法:
// 创建一个4个工作线程的线程池
ThreadPool pool(4);
// 保存返回结果,std::future<int>
auto result = pool.enqueue([](int answer) { return answer; }, 42);
// 等待结果并输出
std::cout << result.get() << std::endl;
// 创建一个4个工作线程的线程池
ThreadPool pool(4);
// 创建一个future的vector批量保存结构
std::vector<std::future<int>> results;
for(int i = 0; i < 10; i++){
// 保存返回结果,std::future<int>
results.emplace_back(pool.enqueue([](int answer) { return answer*answer; }, i));
}
//输出结果
for(auto& res : results){
std::cout << res.get() << std::endl;
}
results.clear();
GitHub: Thread Pool