收录博文中看到的已经封装好的文件时,方便各位查看
Log.hpp
日志信息
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./threadpool.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
if(level== DEBUG) return;
#endif
// va_list ap;
// va_start(ap, format);
// while()
// int x = va_arg(ap, int);
// va_end(ap); //ap=nullptr
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
// struct tm *localtime = localtime(×tamp);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
// vprintf(format, args);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
// FILE *fp = fopen(LOGFILE, "a");
printf("%s%s\n", stdBuffer, logBuffer);
// fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
// fclose(fp);
}
Thread.hpp
对线程接口的简单封装
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
class ThreadData
{
public:
void* args_;
std::string name_;
};
typedef void *(*fun_t)(void *);
//对线程进行封装
class Thread
{
public:
//线程编号 回调函数是什么 给回调函数穿的参数是什么
Thread(int num,fun_t callback,void*args)
:func_(callback)
{
char nameBuffer[64];
snprintf(nameBuffer,sizeof nameBuffer,"Thread-%d",num);
name_=nameBuffer;
td_.name_=name_;
td_.args_=args;
}
//创建线程
void start()
{
pthread_create(&tid_,nullptr,func_,(void*)&td_);
}
void join()
{
pthread_join(tid_,nullptr);
}
std::string name()
{
return name_;
}
~Thread()
{}
private:
std::string name_;
fun_t func_;
ThreadData td_;
pthread_t tid_;
};
lockGuard.hpp
对锁的接口的简单封装
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
// std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
// std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
ThreadPool.hpp
实现的线程池
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <unistd.h>
#include "Thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
const int g_thread_num=10;
template<class T>
class ThreadPool
{
public:
pthread_mutex_t *getMutex()
{
return &lock;
}
bool isEmpty()
{
return task_queue_.empty();
}
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
T getTask()
{
T t = task_queue_.front();
task_queue_.pop();
return t;
}
private:
ThreadPool(int thread_num=g_thread_num)
:num_(thread_num)
{
//创造线程的空间 构造线程
for(int i=1;i<=num_;i++)
{
//每个线程的编号 回调函数 输出型参数
threads_.push_back(new Thread(i,routine,this)); //nullptr后续会改
}
pthread_mutex_init(&lock,nullptr);
pthread_cond_init(&cond,nullptr);
}
ThreadPool(const ThreadPool<T> &other) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &other) = delete;
public:
// 考虑一下多线程使用单例的过程
static ThreadPool<T> *getThreadPool(int num = g_thread_num)
{
// 可以有效减少未来必定要进行加锁检测的问题
// 拦截大量的在已经创建好单例的时候,剩余线程请求单例的而直接访问锁的行为
if (nullptr == thread_ptr)
{
lockGuard lockguard(&mutex);
// 但是,未来任何一个线程想获取单例,都必须调用getThreadPool接口
// 但是,一定会存在大量的申请和释放锁的行为,这个是无用且浪费资源的
// pthread_mutex_lock(&mutex);
if (nullptr == thread_ptr)
{
thread_ptr = new ThreadPool<T>(num);
}
// pthread_mutex_unlock(&mutex);
}
return thread_ptr;
}
//回调函数 相当于消费者
static void* routine(void* args)
{
ThreadData* td=(ThreadData*)(args);
ThreadPool<T>* tp=(ThreadPool<T>*)td->args_;
while(true)
{
T task;
{
lockGuard lockguard(tp->getMutex()/*&lock*/);
while(tp->isEmpty()/*task_queue_.empty()*/) tp->waitCond();
// 读取任务
task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间
}
task(td->name_);
}
}
//创造线程 pthread_create
void run()
{
for(auto& iter:threads_)
{
iter->start();
std::cout<<iter->name()<<" 启动成功"<<std::endl;
}
}
//相当于生产者
void pushTask(const T& task)
{
lockGuard lockguard(&lock);
task_queue_.push(task);
pthread_cond_signal(&cond);
}
~ThreadPool()
{
for(auto& iter:threads_)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::vector<Thread*> threads_;
int num_;
std::queue<T> task_queue_;
pthread_mutex_t lock;//保护临界区(任务队列)的一把锁
pthread_cond_t cond;
static ThreadPool<T> *thread_ptr;
static pthread_mutex_t mutex;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::thread_ptr = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
Sock.hpp
对套接字编程所用接口的简单封装。
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <memory>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
#include "log.hpp"
class Sock
{
private:
const static int gbacklog = 20; //后面再说
public:
Sock()
{}
int Socket()
{
int listensock=socket(AF_INET,SOCK_STREAM,0);
if(listensock<0)
{
logMessage(FATAL, "create socket error, %d:%s", errno, strerror(errno));
exit(2);
}
logMessage(NORMAL, "create socket success, listensock: %d", listensock);
return listensock;
}
void Bind(int sock,uint16_t port,std::string ip="0.0.0.0")
{
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(_port);
inet_pton(AF_INET, _ip.c_str(), &local.sin_addr);
if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind error, %d:%s", errno, strerror(errno));
exit(3);
}
}
void Listen(int sock)
{
if(listen(sock,gbacklog)<0)
{
logMessage(FATAL, "listen error, %d:%s", errno, strerror(errno));
exit(4);
}
logMessage(NORMAL, "init server success");
}
// ip 和 port为输出型参数
//返回提供服务的sock
int Accept(int listensock,std::string *ip,uint16_t* port)
{
struct sockaddr_in src;
socklen_t len=sizeof(src);
int servicesock=accept(listensock,(struct sockaddr*)&src,&len);
if(servicesock < 0)
{
logMessage(ERROR, "accept error, %d:%s", errno, strerror(errno));
return -1;
}
if(port) *port=ntohs(src.sin_port);
if(ip) *ip=inet_ntoa(src.sin_addr);
return servicesock;
}
//建立连接
bool Connect(int sock,const std::string &server_ip,const uint16_t &server_port)
{
struct sockaddr_in server;
memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(server_port);
server.sin_addr.s_addr=inet_addr(server_ip.c_str());
if(connect(sock,(struct sockaddr*)&server,sizeof(server))==0) return true;
else return false;
}
~Sock()
{}
};
TcpServer.hpp
#pragma once
#include "Sock.hpp"
#include <vector>
#include <functional>
#include <pthread.h>
namespace ns_tcpserver
{
using func_t=std::function<void(int)>; //范围值为void 参数类型为int
class TcpServer;
class ThreadData
{
public:
ThreadData(int sock,TcpServer* server)
:sock_(sock)
,server_(server)
{}
TcpServer* server_;
int sock_;
};
class TcpServer
{
private:
static void* ThreadRoutine(void* args)
{
pthread_detach(prhread_self());
ThreadData *td=static_cast<ThreadData*>(args);
td->server_->Excute(td->sock_);
close(td->sock_);
return nullptr;
}
public:
TcpServer(const uint16_t& port, const std::string& ip="0.0.0.0" )
{
listensock_=sock_.Socket();
sock_.Bind(listensock_,port,ip);
sock_.Listen(listensock_);
}
//绑定任务
void BindService(func_t func)
{
func_.push_back(func);
}
//执行任务
void Excute(int sock)
{
for(auto& f:func_)
{
f(sock);
}
}
//运行服务器
void Start()
{
for(;;)
{
std::string clientip;
uint16_t clientport;
int sock=sock_.Accept(listensock_,&clientip,&clientport);
if(sock==-1) continue;
logMessage(NORMAL, "create new link success, sock: %d", sock);
ThreadData *td = new ThreadData(sock, this);
pthread_t tid;
pthread_create(&tid,nullptr,ThreadRoutine,td);
}
}
~TcpServer()
{
if(listensock_>0)
{
close(listensock_);
}
}
private:
int listensock_;
Sock sock_;
std::vector<func_t> func_;
};
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)