线程池
Global.h
#pragma once
const int DEFAULT_POOL_SIZE = 10;
const int STARTED = 0;
const int STOPPED = 1;
Mutex.h
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <deque>
#include <iostream>
#include <vector>
#include <errno.h>
#include <string.h>
#include "Global.h"
using namespace std;
class Mutex
{
public:
Mutex();
~Mutex();
void lock();
void unlock();
pthread_mutex_t* get_mutex_ptr();
private:
pthread_mutex_t m_lock;
volatile bool is_locked;
};
Mutex.cpp
#include "Mutex.h"
Mutex::Mutex()
{
pthread_mutex_init(&m_lock, NULL);
is_locked = false;
}
Mutex::~Mutex()
{
while(is_locked);
unlock();
pthread_mutex_destroy(&m_lock);
}
void Mutex::lock()
{
pthread_mutex_lock(&m_lock);
is_locked = true;
}
void Mutex::unlock()
{
is_locked = false;
pthread_mutex_unlock(&m_lock);
}
pthread_mutex_t* get_mutex_ptr()
{
return &m_lock;
}
CondVar.h
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <deque>
#include <iostream>
#include "Global.h"
using namespace std;
class CondVar
{
public:
CondVar();
~CondVar();
void wait(pthread_mutex_t* mutex);
void signal();
void broadcast();
private:
pthread_cond_t m_cond_var;
};
CondVar.cpp
#include "CondVar.h"
CondVar::CondVar()
{
pthread_cond_init(&m_cond_var, NULL);
}
CondVar::~CondVar()
{
pthread_cond_destroy(&m_cond_var);
}
void CondVar::signal()
{
pthread_cond_signal(&m_cond_var);
}
void CondVar::wait(pthread_mutex_t* mutex)
{
pthread_cond_wait(&m_cond_var, mutex);
}
void CondVar::broadcast()
{
pthread_cond_broadcast(&m_cond_var);
}
Task.h
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <deque>
#include <iostream>
#include <errno.h>
#include <string.h>
#include "Global.h"
using namespace std;
class Task
{
public:
Task(void (*fn_ptr)(void*), void* arg);
~Task();
void operator()();
void run();
private:
void (*m_fn_ptr)(void*);
void* m_arg;
};
Task.cpp
#pragma once
#include "Task.h"
Task::Task(void (*fn_ptr)(void*), void* arg) : m_fn_ptr(fn_ptr), m_arg(arg)();
Task::~Task(){}
void Task::operator()()
{
(*m_fn_ptr)(m_arg);
}
void Task::run()
{
(*m_fn_ptr)(m_arg);
}
ThreadPool.h
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <iostream>
#include <errno.h>
#include <vector>
#include "Mutex.h"
#include "Task.h"
#include "CondVar.h"
#include "Global.h"
class ThreadPool
{
public:
ThreadPool();
ThreadPool(int pool_size);
~ThreadPool();
int initialize_threadpool();
int destroy_threadpool();
void* execute_thread();
int add_task(Task* task);
private:
int m_pool_size;
Mutex m_task_mutex;
CondVar m_task_cond_var;
vector<pthread_t> m_threads;
deque<Task*> m_tasks;
volatile int m_pool_state;
};
ThreadPool.cpp
#pragma once
#include "ThreadPool.h"
ThreadPool::ThreadPool() : m_pool_size(DEFAULT_POOL_SIZE)
{
cout << "Constructed ThreadPool of size " << m_pool_size << endl;
}
ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size)
{
cout << "Constructed ThreadPool of size " << m_pool_size << endl;
}
ThreadPool::~ThreadPool()
{
if(m_pool_state != STOPPED)
destroy_threadpool();
}
extern "C"
void* start_thread(void* arg)
{
ThreadPool* tp = (ThreadPool*)arg;
tp->execute_thread();
return NULL;
}
int ThreadPool::initialize_threadpool()
{
m_pool_state = STARTED;
int ret = -1;
for (int i = 0; i < m_pool_size; i++) {
pthread_t tid;
ret = pthread_create(&tid, NULL, start_thread, (void*) this);
if (ret != 0) {
cerr << "pthread_create() failed: " << ret << endl;
return -1;
}
m_threads.push_back(tid);
}
cout << m_pool_size << " threads created by the thread pool" << endl;
return 0;
}
int ThreadPool::destroy_threadpool()
{
m_task_mutex.lock();
m_pool_state = STOPPED;
m_task_mutex.unlock();
cout << "Broadcasting STOP signal to all threads..." << endl;
m_task_cond_var.broadcast(); // notify all threads we are shttung down
int ret = -1;
for (int i = 0; i < m_pool_size; i++) {
void* result;
ret = pthread_join(m_threads[i], &result);
cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;
m_task_cond_var.broadcast();
}
cout << m_pool_size << " threads exited from the thread pool" << endl;
return 0;
}
void* ThreadPool::execute_thread()
{
Task* task = NULL;
cout << "Starting thread " << pthread_self() << endl;
while(true) {
cout << "Locking: " << pthread_self() << endl;
m_task_mutex.lock();
while ((m_pool_state != STOPPED) && (m_tasks.empty())) {
cout << "Unlocking and waiting: " << pthread_self() << endl;
m_task_cond_var.wait(m_task_mutex.get_mutex_ptr());
cout << "Signaled and locking: " << pthread_self() << endl;
}
if (m_pool_state == STOPPED) {
cout << "Unlocking and exiting: " << pthread_self() << endl;
m_task_mutex.unlock();
pthread_exit(NULL);
}
task = m_tasks.front();
m_tasks.pop_front();
cout << "Unlocking: " << pthread_self() << endl;
m_task_mutex.unlock();
(*task)();
delete task;
}
return NULL;
}
int ThreadPool::add_task(Task* task)
{
m_task_mutex.lock();
m_tasks.push_back(task);
m_task_cond_var.signal();
m_task_mutex.unlock();
return 0;
}
Makefile
OBJPATH=bin/obj
EXAMPLEPATH=bin/example
all:
g++ CondVar.cpp -lpthread -c -o $(OBJPATH)/CondVar.o
g++ Mutex.cpp -lpthread -c -o $(OBJPATH)/Mutex.o
g++ Task.cpp -lpthread -c -o $(OBJPATH)/Task.o
g++ ThreadPool.cpp -lpthread -c -o $(OBJPATH)/ThreadPool.o
g++ $(OBJPATH)/CondVar.o $(OBJPATH)/Mutex.o $(OBJPATH)/Task.o $(OBJPATH)/ThreadPool.o threadpool_test.cpp -lpthread -o $(EXAMPLEPATH)threadpool_test