文章目录
- 线程池的定义
- 使用线程池的原因
- 基于POSIX实现的线程池
- 基于block队列的线程池实现
- 基于ring队列的线程池实现
- 设计单例模式线程池
线程池的定义
线程池就一堆已经创建好的任务线程,初始它们都处于空闲等待状态,当有新的任务需要处理的时候,就从这线程池里取一个空闲的线程来处理任务,当任务处理完成后再次把线程返回线程池里(把线程置于空闲等待状态),以供后面线程继续使用。当线程池里所有的线程都处于忙碌状态时,可以根据情况进行等待或创建一个新的线程放入线程池里。
使用线程池的原因
线程的创建和销毁相对于进程的创建和销毁来说是轻量级的(即开销小),但是当我们的任务需要进行大量线程的创建和销毁时,这些开销合在一起就比较大了。比如,当设计一个压力性能测试框架时,需要连续产生大量的并发操作。线程池在这种场合是非常适用的。线程池的好处就在于线程复用,某个线程在处理完一个任务后,可以继续处理下一个任务,不用重新创建和销毁,避免了无谓的开销,因此线程池适用于连续产生大量并发任务的场合。
基于POSIX实现的线程池
在了解线程池的基本原理,下面我们用c++传统的方式也就POSIX来实现一个基本的线程池,该线程池虽然简单,但能体现线程池的基本工作原理。线程池的实现千变万化,有时候要根据实际的场合来定制,当原理都是一样的,
基于block队列的线程池实现
- 实现block队列
- 创建一堆线程,线程从队列拿取任务
#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
#include <semaphore.h>
#include<unistd.h>
namespace ns_block_queue_pthread_pool
{
template <class T>
class pthread_pool
{
private:
int num_;
std::queue<T> task_queue_;
pthread_cond_t task_queue_cond_;
pthread_mutex_t mtx_;
private:
void Lock()
{
pthread_mutex_lock(&mtx_);
}
void UnLock()
{
pthread_mutex_unlock(&mtx_);
}
bool IsEmpty()
{
return task_queue_.empty();
}
void WakeUpThread()
{
pthread_cond_signal(&task_queue_cond_);
}
public:
pthread_pool(int num )
: num_(num)
{
pthread_cond_init(&task_queue_cond_, nullptr);
pthread_mutex_init(&mtx_, nullptr);
}
~pthread_pool()
{
pthread_cond_destroy(&task_queue_cond_);
pthread_mutex_destroy(&mtx_);
}
void TaskPop(T &out)
{
Lock();
while (IsEmpty())
{
pthread_cond_wait(&task_queue_cond_, &mtx_);
}
out = task_queue_.front();
task_queue_.pop();
UnLock();
}
void TaskPush(T &in)
{
Lock();
task_queue_.push(in);
UnLock();
WakeUpThread();
}
static void *routine(void *agrs)
{
pthread_pool<T> *pp = (pthread_pool<T> *)agrs;
while (true)
{
T task;
pp->TaskPop(task);
std::cout << "线程:" << pthread_self() << "执行数据:" << task << "执行任务完成" << std::endl;
}
}
void PthreadPoolInit()
{
pthread_t id;
for (int i = 0; i < num_; i++)
{
pthread_create(&id, nullptr, routine, this );
}
}
};
}
基于ring队列的线程池实现
namespace ns_ring_queue_pthread_pool
{
template <class T>
class pthread_pool
{
const int queue_cap = 5;
private:
int num_;
std::vector<T> task_queue;
size_t p_pos;
size_t c_pos;
sem_t q_blank;
sem_t q_data;1
pthread_mutex_t mtx_;
pthread_mutex_t c_mutex;
pthread_mutex_t p_mutex;
private:
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void UnLock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
pthread_pool(int num )
: num_(num), c_pos(0), p_pos(0)
{
task_queue.resize(queue_cap);
pthread_mutex_init(&mtx_, nullptr);
pthread_mutex_init(&p_mutex,nullptr);
pthread_mutex_init(&c_mutex,nullptr);
sem_init(&q_blank, NULL, queue_cap);
sem_init(&q_data, NULL, 0);
std::cout<<"pthread_pool 初始化成功"<<std::endl;
}
~pthread_pool()
{
sem_destroy(&q_blank);
sem_destroy(&q_data);
pthread_mutex_destroy(&mtx_);
pthread_mutex_destroy(&c_mutex);
pthread_mutex_destroy(&p_mutex);
}
void TaskPop(T &out)
{
sem_wait(&q_data);
Lock(c_mutex);
out = task_queue[c_pos];
sem_post(&q_blank);
c_pos++;
c_pos %= queue_cap;
UnLock(c_mutex);
}
void TaskPush(T &in)
{
sem_wait(&q_blank);
Lock(p_mutex);
task_queue[p_pos] = in;
sem_post(&q_data);
p_pos++;
p_pos %= queue_cap;
UnLock(p_mutex);
}
static void *routine(void *agrs)
{
pthread_pool<T> *pp = (pthread_pool<T> *)agrs;
while (true)
{
T task;
pp->TaskPop(task);
std::cout << "线程:" << pthread_self() << "执行数据:" << task << "执行任务完成" << std::endl;
sleep(4);
}
}
void PthreadPoolInit()
{
pthread_t id;
for (int i = 0; i < num_; i++)
{
pthread_create(&id, nullptr, routine, this );
}
}
};
}
注意:
这里没有对线程池销毁线程操作,声明周期随进程。
设计单例模式线程池
线程池本身会在任何场景,任何环境下被调用,因此我们可以设计成单例模式,由一个单例对线程池进行管理。
#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
namespace ns_pthread_pool
{
const int g_default_val = 3;
template <class T>
class pthread_pool
{
private:
int num_;
std::queue<T> task_queue_;
pthread_cond_t task_queue_cond_;
pthread_mutex_t mtx_;
static pthread_pool<T> *ins_;
pthread_pool(int num = g_default_val )
: num_(num)
{
pthread_cond_init(&task_queue_cond_, nullptr);
pthread_mutex_init(&mtx_, nullptr);
}
private:
void Lock()
{
pthread_mutex_lock(&mtx_);
}
void UnLock()
{
pthread_mutex_unlock(&mtx_);
}
bool IsEmpty()
{
return task_queue_.empty();
}
void WakeUpThread()
{
pthread_cond_signal(&task_queue_cond_);
}
public:
static pthread_pool<T> *GetInstance()
{
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
if (ins_ == nullptr)
{
pthread_mutex_lock(&lock);
if (ins_ == nullptr)
{
ins_ = new pthread_pool<T>();
ins_->PthreadPoolInit();
std::cout << "该类第一次创建对象" << std::endl;
}
}
return ins_;
}
pthread_pool(const pthread_pool<T> &pool) = delete;
pthread_pool<T> &operator=(const pthread_pool<T> &pool) = delete;
~pthread_pool()
{
pthread_cond_destroy(&task_queue_cond_);
pthread_mutex_destroy(&mtx_);
}
void TaskPop(T &out)
{
Lock();
while (IsEmpty())
{
pthread_cond_wait(&task_queue_cond_, &mtx_);
}
out = task_queue_.front();
task_queue_.pop();
UnLock();
}
void TaskPush(T &in)
{
Lock();
task_queue_.push(in);
UnLock();
WakeUpThread();
}
static void *routine(void *agrs)
{
pthread_pool<T> *pp = (pthread_pool<T> *)agrs;
while (true)
{
T task;
pp->TaskPop(task);
std::cout << "线程:" << pthread_self() << "执行数据:" << task() << "执行任务完成" << std::endl;
}
}
void PthreadPoolInit()
{
pthread_t id;
for (int i = 0; i < num_; i++)
{
pthread_create(&id, nullptr, routine, this );
}
}
};
template <class T>
pthread_pool<T> *pthread_pool<T>::ins_ = nullptr;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)