基本概念
IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:
(1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。
(2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
(3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
(4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。
(5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。
select 函数
int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)
- 第一个参数maxfdp1指定待测试的描述字个数,它的值是待测试的最大描述字加1(因此把该参数命名为maxfdp1),描述字0、1、2…maxfdp1-1均将被测试。
- 中间的三个参数readset、writeset和exceptset指定我们要让内核测试读、写和异常条件的描述字。如果对某一个的条件不感兴趣,就可以把它设为空指针。struct fd_set可以理解为一个集合,这个集合中存放的是文件描述符,可通过以下四个宏进行设置:
void FD_ZERO(fd_set *fdset);
void FD_SET(int fd, fd_set *fdset);
void FD_CLR(int fd, fd_set *fdset);
int FD_ISSET(int fd, fd_set *fdset);
(1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。
(2)等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
(3)根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。
select函数注意事项
- 文件的描述符的监视范围与第一个函数参数有关
- 函数的超时时间与select函数的最后一个结构体 timeval有关
- select函数只有在监视的文件描述符发生变化才返回,如果未发生变化,就会发生阻塞,指定超时就是为了防止上述情况
- 将准备好的fd_set变量reads的内容复制到temps变量,因为调用函数后除了发生变化的描述符对应的位为1,其他的都为0,也就是说客户端在线,但是没有发生通讯,这个时候要记住这个在线的客户端就必须要
进行复制,这是使用select的通用方法。 - timeval每次循环都会被初始化一次
- select 的函数返回值只有标记作用,只表示发生变化的活跃的描述符数目,要想知道哪些描述符发生变化,还得必须遍历FD_SET,代码如下:
while(1)
{
timeout.tv_sec=30;
timeout.tv_usec=0;
temp_set=read_set;
result=select(fd_max+1,&temp_set,NULL,NULL,&timeout);
if(result==-1)
{
std::cout<<"select error!"<<std::endl;
break;
}
if(result==0)
{
std::cout<<"server waiting time out!"<<std::endl;
continue;
}
for(int index=0;index!=fd_max+1;index++)
{
if(FD_ISSET(index,&temp_set))
{
if(index==fd_server)
{
handle_on();
}
else
{
handle_task(index);
}
}
}
}
}
上面的例子result知识起一个判断作用,要想获取详细信息,还得遍历FD_SET,从0到fd_max+1;
当管道一端被关闭时候,遵循下面两条规则:
(1) 当读一个写端已经被关闭的管道,在所有的数据都被读取后,read返回0,表示文件结束
(2)如果写一个读端已经被关闭的管道,则产生信号SIGPIPE,该信号可以被捕捉。
下面的代码就是父进程通知服务器客户端已经下线,服务器端则关闭子进程的两个管道的读写描述符,子进程读到0后自动返回,退出循环。
void chat()
{
fd_write=open(m_pipe_wpath.c_str(),O_WRONLY);
fd_read=open(m_pipe_rpath.c_str(),O_RDONLY);
if(fork()==0)
{
close(fd_server);
close(fd_write);
while(bzero(msg,1024),read(fd_read,msg,1024)!=0)
{
std::cout<<msg<<std::endl;
}
close(fd_read);
exit(0);
}
close(fd_read);
while(bzero(msg,1024),fgets(msg,1024,stdin)!=NULL)
{
write(fd_write,msg,strlen(msg));
}
}
select多路复用服务器流程图
代码区域
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/select.h>
#include <sys/time.h>
#include <signal.h>
#include <json/json.h>
#include <stdexcept>
#include <iostream>
#include <sstream>
#define FD_MAX 1023
namespace SELECT
{
struct Client_info
{
Client_info():m_next(NULL){}
int m_pid;
int m_read;
int m_write;
std::string m_name;
Client_info* m_next;
};
void sighandle(int signum)
{
std::cout<<"Capture signal is "<<signum<<std::endl;
}
class Select
{
public:
Select(const std::string pipe_path):m_head(NULL)
{
fd_server=open(pipe_path.c_str(),O_RDONLY);
if(fd_server==-1)
{
throw std::runtime_error("open pipe failed");
}
fd_max=fd_server;
signal(SIGPIPE,sighandle);
}
void init()
{
FD_ZERO(&read_set);
FD_ZERO(&temp_set);
FD_SET(fd_server,&read_set);
}
void handle_on()
{
char recv_msg[1024];
read(fd_server,recv_msg,1024);
std::string msg(recv_msg);
if(msg.find("on")!=std::string::npos)
{
online(msg);
}
}
void online(const std::string& msg)
{
Json::Value value;
reader.parse(msg,value,false);
Client_info* inode=new Client_info();
inode->m_pid=value["pid"].asInt();
inode->m_name=value["name"].asString();
inode->m_read=open(value["pipe_w"].asString().c_str(),O_RDONLY);
inode->m_write=open(value["pipe_r"].asString().c_str(),O_WRONLY);
if(inode->m_read>fd_max)
fd_max=inode->m_read;
inode->m_next=m_head;
m_head=inode;
FD_SET(inode->m_read,&read_set);
std::stringstream ss;
ss<<inode->m_name<<" is on line !"<<std::endl;
std::string onmsg=ss.str();
std::cout<<onmsg;
dispatch(onmsg,inode->m_pid);
}
void offline(const std::string& msg)
{
Json::Value value;
reader.parse(msg,value,false);
Client_info* search=m_head;
Client_info* pre=NULL;
while(search)
{
if(search->m_pid==value["pid"].asInt())
{
break;
}
pre=search;
search=search->m_next;
}
FD_CLR(search->m_read,&read_set);
std::stringstream ss;
ss<<search->m_name<<" off line normal"<<std::endl;
std::string offmsg=ss.str();
std::cout<<offmsg;
dispatch(offmsg,search->m_pid);
close(search->m_read);
close(search->m_write);
if(pre==NULL)
{
m_head=search->m_next;
search->m_next=NULL;
delete search;
}
else
{
pre->m_next=search->m_next;
delete search;
}
}
void dropoff(const int& read)
{
Client_info* search=m_head;
Client_info* pre=NULL;
while(search)
{
if(search->m_read==read)
{
break;
}
pre=search;
search=search->m_next;
}
FD_CLR(search->m_read,&read_set);
std::stringstream ss;
ss<<search->m_name<<" drop off line !"<<std::endl;
std::string offmsg=ss.str();
std::cout<<offmsg;
dispatch(offmsg,search->m_pid);
close(search->m_read);
close(search->m_write);
if(pre==NULL)
{
m_head=search->m_next;
search->m_next=NULL;
delete search;
}
else
{
pre->m_next=search->m_next;
delete search;
}
}
void handle_task(const int& index)
{
char recv_buf[1024]="";
int recv_len=read(index,recv_buf,1024);
if(recv_len==0)
{
dropoff(index);
}
else if(recv_len>0)
{
std::string msg(recv_buf);
if(msg.find("off")!=std::string::npos)
{
offline(msg);
}
else
{
sendmsg(msg);
}
}
}
void sendmsg(const std::string& msg)
{
Json::Value msginfo;
reader.parse(msg,msginfo,false);
int msg_pid=msginfo["pid"].asInt();
std::string clientname=msginfo["name"].asString();
std::string clientmsg=msginfo["msg"].asString();
std::stringstream ss;
ss<<clientmsg<<"from client :"<<clientname<<std::endl;
std::string msgmsg=ss.str();
dispatch(msgmsg,msg_pid);
}
void dispatch(const std::string& msg,const int& pid)
{
Client_info* broadcast=m_head;
while(broadcast)
{
if(broadcast->m_pid!=pid)
{
write(broadcast->m_write,msg.c_str(),msg.size());
}
broadcast=broadcast->m_next;
}
}
void wait()
{
init();
std::cout<<"server waiting for connecting"<<std::endl;
while(1)
{
timeout.tv_sec=30;
timeout.tv_usec=0;
temp_set=read_set;
result=select(fd_max+1,&temp_set,NULL,NULL,&timeout);
if(result==-1)
{
std::cout<<"select error!"<<std::endl;
break;
}
if(result==0)
{
std::cout<<"server waiting time out!"<<std::endl;
continue;
}
for(int index=0;index!=fd_max+1;index++)
{
if(FD_ISSET(index,&temp_set))
{
if(index==fd_server)
{
handle_on();
}
else
{
handle_task(index);
}
}
}
}
}
private:
Client_info* m_head;
fd_set read_set;
fd_set temp_set;
struct timeval timeout;
int result;
int fd_server;
int fd_max;
Json::FastWriter writer;
Json::Reader reader;
};
}
#include"select_pro.hpp"
int main(int argc,char** argv)
{
SELECT::Select* server=new SELECT::Select(argv[1]);
server->init();
server->wait();
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <sys/select.h>
#include <sys/time.h>
#include <json/json.h>
#include <stdexcept>
#include <iostream>
#include <sstream>
namespace CLIENT
{
class Client
{
public:
Client(const std::string& server_path,const std::string& pipe_rpath,const std::string& pipe_wpath,const std::string& name):m_pipe_rpath(pipe_rpath),m_pipe_wpath(pipe_wpath),m_name(name)
{
fd_server=open(server_path.c_str(),O_WRONLY);
if(fd_server==-1)
{
perror("open fd_server");
}
}
void connect()
{
std::cout<<"connect server"<<std::endl;
Json::Value info;
info["name"]=m_name;
info["pid"]=getpid();
info["pipe_r"]=m_pipe_rpath;
info["pipe_w"]=m_pipe_wpath;
info["type"]="on";
std::string jsonfile=writer.write(info);
write(fd_server,jsonfile.c_str(),jsonfile.size());
}
void chat()
{
fd_write=open(m_pipe_wpath.c_str(),O_WRONLY);
fd_read=open(m_pipe_rpath.c_str(),O_RDONLY);
if(fork()==0)
{
close(fd_server);
close(fd_write);
while(bzero(msg,1024),read(fd_read,msg,1024)!=0)
{
std::cout<<msg<<std::endl;
}
close(fd_read);
exit(0);
}
close(fd_read);
while(bzero(msg,1024),fgets(msg,1024,stdin)!=NULL)
{
write(fd_write,msg,strlen(msg));
}
}
void offline()
{
Json::Value info;
info["pid"]=getpid();
info["type"]="off";
std::string jsonfile=writer.write(info);
write(fd_write,jsonfile.c_str(),jsonfile.size());
std::cout<<"waiting........"<<std::endl;
wait(NULL);
std::cout<<"waited !"<<std::endl;
close(fd_server);
close(fd_write);
}
private:
int fd_write;
int fd_read;
int fd_server;
std::string m_name;
std::string m_pipe_rpath;
std::string m_pipe_wpath;
Json::FastWriter writer;
Json::Reader reader;
char msg[1024];
};
}
#include"client_pro.hpp"
int main(int argc,char** argv)
{
CLIENT::Client* client=new CLIENT::Client(argv[1],argv[2],argv[3],argv[4]);
client->connect();
client->chat();
client->offline();
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)