TCP服务器epoll的多种实现

2023-11-02

TCP服务器epoll的多种实现

对于网络IO会涉及到两个系统对象

  1. 用户空间中进程或者线程
  2. 操作系统内核

比如发生read操作时就会经历两个阶段

  1. 等待数据就绪
  2. 将数据从内核缓冲区拷贝到用户缓冲区

由于各个阶段多有不同的情况,一组合么就产生了多种网络 IO 模型

阻塞IO

在Linux中默认所有socket都是blocking的,一个典型的读流程

image-20210713161136792

  1. 当应用进程调用read这个系统调用,如果数据没有到达,或者收到的数据包还不完整就会阻塞read调用,等待足够的数据到达

  2. Kernel准备好数据,他就会将数据从Kernel中拷贝到用户内存,Kernel返回结果,解除block状态,重新运行起来

    于是就有了下面这种服务结构

    111

代码实现一个简单的反射服务器:

#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#pragma clang diagnostic push
#pragma ide diagnostic ignored "EndlessLoop"
using std::cout;
using std::endl;
int main(int argc,char * argv[])
{
    //1.create socket
    int listenfd = socket(AF_INET,SOCK_STREAM,0);
    if(listenfd == -1)
    {
        cout<<"create listenfd failed"<<endl;
        return -1;
    }
    //2.Initialize server address
    struct sockaddr_in bindaddr{};
    bindaddr.sin_family =AF_INET;
    bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    bindaddr.sin_port= htons(3000);
    if (bind(listenfd,(struct  sockaddr*) &bindaddr, sizeof(bindaddr)) == -1)
    {
        cout<<"bind listen socket failed!"<<endl;
        return -1;
    }
    //3.Start listening
    if(listen(listenfd,SOMAXCONN) == -1)
    {
        cout<<"listen error"<<endl;
        return -1;
    }
    while (true)
    {
        sockaddr_in clientaddr{};
        socklen_t  clientaddrlen = sizeof(clientaddr);
        //4.accept client connect
        int clientfd = accept(listenfd,(struct sockaddr*)&clientaddr,&clientaddrlen);
        if (clientfd != -1)
        {
            //5.Receive data from the client
            char recvBuf[32]={0};
            int ret = recv(clientfd,recvBuf,32,0);
            if (ret > 0)
            {
                cout<<"Receive data from the client:"<<recvBuf<<endl;
                ret = send(clientfd,recvBuf, strlen(recvBuf),0);
                if(ret != strlen(recvBuf))
                    cout<<"send failed"<<endl;
                else
                    cout<<"send successfully"<<endl;
            }
            else
            {
                cout<<"Receive data error"<<endl;
            }
            close(clientfd);
        }
    }
    //7.close listen
    close(listenfd);
    return 0;
}
#pragma clang diagnostic pop

image-20210713162848960

但这样的架构有巨大的缺陷:

  • 因为所有IO都是阻塞的,这就造成send 过程中线程将被阻塞,会浪费大量的CPU时间,效率极低

非阻塞IO

在Linux下,我们可以主动将socket设置为非阻塞,这时流程就会编程下面这样

image-20210713163857960

返回值 含义
大于0 接收到的字节数
等于0 连接正常断开
等于-1,error等于EAGAIN 表示recv操作还没有完成
等于-1,error不等于EAGAIN 表示recv操作遇到系统错误

使用如下函数将socket设置为非阻塞状态

fcntl( fd, F_SETFL, O_NONBLOCK );

于是我们可以实现如下模型

image-20210713164813579

可以看到服务器线程可以通过循环调用 recv()接口,可以在单个线程内实现对所有连接的数据接收工作。但是上述模型绝不被推荐。因为,循环调用 recv()将大幅度推高 CPU 占用率;此外,在这个方案中 recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如 select()多路复用模式, 可以一次检测多个连接是否活跃

多路复用IO (IO multiplexing)

采用Linux中的select或者poll

下面我们以select举例

select函数用于检测一组socket中是否有事件就绪.这里的事件为以下三类:

  1. 读事件就绪
    • socket内核中,接收缓冲区中的字节数大于或者等于低水位标记SO_RCVLOWAT,此时调用recread函数可以无阻塞的读取该文件描述符,并且返回值大于零
    • TCP连接的对端关闭连接,此时本端调用rrecvread函数对socket进行读操作,recvread函数返回0
    • 在监听的socket上有新的连接请求
    • socket尚有未处理的错误
  2. 写事件就绪
    • socket内核中,发送缓冲区中的可用字节数大于等于低水位标记时,可以无阻塞的写,并且返回值大于0
    • socket的写操作被关闭时,对一个写操作被关闭的socket进行写操作,会触发SIGPIPE信号
    • socket使用非阻塞connect连接成功或失败时
  3. 异常事件就绪

select()如下:

#include <sys/select.h>   
    int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout);

参数说明

nfds: Linux上的socket也叫作fd,将这个参数的值设置为所有需要使用select函数检测事件的fd中的最大值加1即nfds=max(fd1,fd2,...,fdn)+1
readfds: 需要监听可读事件的fd集合
writefds: 需要监听可写事件fd的集合
exceptfds: 需要监听异常事件的fd集合
timeout: 超时时间,即在这个参数设定的时间内检测这些fd的事件,超过这个时间后,select函数立即返回,这是一个timeval结构体

其定义如下:

struct timeval{      
        long tv_sec;   /*秒 */
        long tv_usec;  /*微秒 */   
    }

参数readfds,writefds,exceptfds的类型都是fd_set,这是一个结构体信息

定义如下

//#define __FD_SETSIZE		1024
#define __NFDBITS	(8 * (int) sizeof (__fd_mask))
#define	__FD_ELT(d)	((d) / __NFDBITS)
#define	__FD_MASK(d)	((__fd_mask) (1UL << ((d) % __NFDBITS)))

/* fd_set for select and pselect.  */
typedef struct
  {
    /* XPG4.2 requires this member name.  Otherwise avoid the name
       from the global namespace.  */
#ifdef __USE_XOPEN
    //typedef long int __fd_mask;
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#endif
  } fd_set;

/* 最大数量`fd_set'.  */
#define	FD_SETSIZE		__FD_SETSIZE

假设未定义__USE_XOPEN整理一年

typedef struct
  {
//typedef long int __fd_mask;
    long int fds_bits[__FD_SETSIZE / __NFDBITS];
  } fd_set;

将一个fd添加到fd_set这个集合中时需要使用FD_SET宏,其定义如下:

void FD_SET(fd, fdsetp)

实现如下:

#define	FD_SET(fd, fdsetp)	__FD_SET (fd, fdsetp)

__FD_SET (fd, fdsetp)实现如下:

/* We don't use `memset' because this would require a prototype and   the array isn't too big.  */# define __FD_ZERO(set)  \  do {									      \    unsigned int __i;							      \    fd_set *__arr = (set);						      \    for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i)	      \      __FDS_BITS (__arr)[__i] = 0;					      \  } while (0)#endif	/* GNU CC */#define __FD_SET(d, set) \  ((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))

举个例子,假设现在fd的值为43,那么在数组下表为0的元素中第43个bit被置为1


再Linux上,向fd_set集合中添加新的fd时,采用位图法确定位置;在windows中添加fd至fd_set的实现规则依次从数组第0个位置开始向后递增


也就是说,FD_SET宏本质上是在一个有1024个连续bit的数组的第fd位置置1.

同理,FD_CLR删除一个fd的原理,也就是将数组的第fd位置置为0

image-20210706144042275

image-20210713165838027

实例;

#include <sys/types.h>#include <sys/socket.h>#include <arpa/inet.h>#include <unistd.h>#include <iostream>#include <cstring>#include <sys/time.h>#include <vector>#include <cerrno>//Customize the value representing invalid fd#pragma clang diagnostic push#pragma ide diagnostic ignored "EndlessLoop"#define INVALID_FD -1int main(int argc,char * argv[]){    //create a listen socket    int listenfd = socket(AF_INET,SOCK_STREAM,0);    if(listenfd == INVALID_FD)    {        printf("创建监听socket失败");        return -1;    }    //init server addr    sockaddr_in bindaddr{};    bindaddr.sin_family = AF_INET;    bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);    bindaddr.sin_port= htons(3000);    if(bind(listenfd,(struct sockaddr*) &bindaddr, sizeof(bindaddr)) == -1)    {        printf("绑定socket失败");        close(listenfd);        return -1;    }    //start listen    if(listen(listenfd,SOMAXCONN) == -1)    {        printf("监听失败!");        close(listenfd);        return -1;    }    //Store the client's socket data    std::vector<int> clientfds;    int maxfd;    while(true)    {        fd_set readset;        FD_ZERO(&readset);        FD_SET(listenfd,&readset);        maxfd = listenfd;        unsigned long clientfdslength = clientfds.size();        for (int i = 0; i < clientfdslength; ++i)        {            if(clientfds[i] != INVALID_FD)            {                FD_SET(clientfds[i],&readset);                if(maxfd<clientfds[i])                    maxfd = clientfds[i];            }        }        timeval tm{};        tm.tv_sec = 1;        tm.tv_usec =0;        int  ret = select(maxfd+1,&readset, nullptr, nullptr,&tm);        if(ret == -1)        {            if (errno != EINTR)                break;        }        //time out        else if (ret ==0 )        {            continue;        }        else        {            //event detected on a socket            if (FD_ISSET(listenfd,&readset))            {                sockaddr_in clientaddr{};                socklen_t  clientaddrlen = sizeof(clientaddr);                //accept client connection                int clientfd = accept(listenfd,(struct sockaddr *)&clientaddr,&clientaddrlen);                if (clientfd == INVALID_FD)                {                    break;                }                std::cout<<"接受到客户端连接,fd:"<<clientfd<<std::endl;                clientfds.push_back(clientfd);            }            else            {                //Assume that the data length sent by the client is not greater than 63                char recvbuf[64];                unsigned long clientfdslength = clientfds.size();                for (int i = 0; i < clientfdslength; ++i)                {                    if(clientfds[i] != INVALID_FD && FD_ISSET(clientfds[i],&readset))                    {                        memset(recvbuf,0, sizeof(recvbuf));                        //accept data                        int length = recv(clientfds[i],recvbuf,64,0);                        //recv的返回值等于0,表示客户端关闭了连接                        if (length <=0 )                        {                            //error                            std::cout<<"error"<<clientfds[i]<<std::endl;                            close(clientfds[i]);                            clientfds[i] == INVALID_FD;                            continue;                        }                        std::cout<<"clientfd: "<<clientfds[i]<<", recv data:"<<recvbuf<<std::endl;                    }                }            }        }    }    //close all client socket    int clientfdslength = clientfds.size();    for (int i = 0; i < clientfdslength; ++i)    {        if(clientfds[i] != INVALID_FD)        {            close(clientfds[i]);        }    }    //close socket    close(listenfd);    return 0;}#pragma clang diagnostic pop

使用nc -v 127.0.0.1 3000来模拟客户端,打开三个终端

image-20210706165026755

关于以上代码,需要注意以下几点:

  1. select函数在调用前后可能会修改readfds,writefds,exceptfds所以想在下次调用select函数时服用这些fd_set变量需要重新清零,添加内容

    for (int i = 0; i < clientfdslength; ++i)        {            if(clientfds[i] != INVALID_FD)            {                FD_SET(clientfds[i],&readset);                if(maxfd<clientfds[i])                    maxfd = clientfds[i];            }        }
    
  2. select函数也会修改timeval结构体的值,如果想复用这些变量,需要重新设置

    timeval tm{};        tm.tv_sec = 1;        tm.tv_usec =0;
    
  3. 如果将selecttimeval参数设置为NULL,则select函数会一直阻塞下去

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

TCP服务器epoll的多种实现 的相关文章

随机推荐

  • 机器学习面试题汇总(1~50题)

    机器学习面试题汇总 1 50题 1 深度神经网络预防过拟合的方法 2 SMOTE算法 过采样算法 3 为什么LR 逻辑回归 用sigmoid函数 4 LR损失函数 5 几种神经网络梯度下降方法 6 克莱姆法则 7 各种排序的时间复杂度 8
  • 【JDK】:Java容器框架——同步容器与并发容器

    前面的文章中详细介绍了Java的容器框架 在此基础上 本文对Java中的同步容器与并发容器做一些介绍 fail fast机制 快速报错机制 fail fast 能够防止多个进程同时修改同一个容器的内容 如果在你迭代遍历某个容器的过程中 另一
  • 云服务器维护工作,服务器日常维护工作

    服务器日常维护工作 内容精选 换一换 以昇腾 AI 处理器的PCIe的工作模式进行区分 如果PCIe工作在主模式 可以扩展外设 则称为RC模式 如果PCIe工作在从模式 则称为EP模式 昇腾 AI 处理器的工作模式如下 昇腾310 AI处理
  • Vuex学习总结

    Vuex学习总结 1 概述 说说我的理解 不一定对 由于前端的组件化开发常常难免涉及到组件之间的通信 对于一个小型项目来说 通过父子组件通信就够用了 但是 可能随着项目扩大 存在兄弟组件或者祖孙组件通信的通信 其解决方法通常有两种 中央bu
  • QML VideoOutput填充模式引发的问题

    QML Camera显示不全问题 BUG记录 QML使用Camera组件获取摄像头中的视频 使用VideoOutput显示 显示没有问题 但在Capture时 生成的图片明显比显示的东西多 真实生成的图片 在VideoOutput显示的部分
  • JMJS系统总结系列----JMJS中接口使用的技术(六)

    三同时接口 调用外部的webService A 添加WebService引用 using JMJS BusinessLogic IsSupervisionsServiceReference B 具体调用 public class IsSup
  • MySQL表操作:提高数据处理效率的秘诀(进阶)(1)

    生命不在于相信奇迹 而在于创造奇迹 朱学恒 作者 不能再留遗憾了 专栏 MySQL学习 本文章主要内容 MySQL对表操作进阶 数据库约束 表的设计 新增 后续会更新进阶表的查询 文章目录 前言 1 数据库约束 NULL约束 UNIQUE
  • VTK 测量类Widget的应用 与 vtkDistanceWidget 3D测试 问题

    vtkDistanceWidget 用于在二维平面上测量两点之间的距离 vtkAngleWidget 用于在平面的角度测量 AngleWidget 感觉这都是 2D控件 include
  • nvm use 无法切换npm的解决办法

    nvm正常的安装目录结构 没有正常切换时对应的文件结构 3 解决方法 nvm use 无效 即 nvm use 后执行nvm ls 当前使用的版本前正常是会有 标注 可是nvm use 所有版本前都没有 我当前使用的版本是 6 10 0 我
  • 编写一个类实现银行账户Account的概念

    编写一个类实现银行账户Account的概念 包含的属性有 帐号 密码 存款余额 利率rate 最小余额 定义封装这些属性的方法 账号要自动生成 编写主类 使用银行账户类 输入 输出3个储户的上述信息 考虑 哪些属性可以设计成static属性
  • 【APP自动化测试必知必会】Appium之微信小程序自动化测试

    本节大纲 H5 与小程序介绍 混合 App 元素定位环境部署 混合 App 元素操作 Airtest 测试 App 01 H5与小程序介绍 H5概述 H5 是指第 5 代 HTML 也指用 H5 语言制作的一切数字产品 所谓 HTML 是
  • springboot2整合redis超简单

    依赖
  • [开发

    同时支持中国手机和固定电话号码的正则表达式 const pattern 1 3 9 d 9 0 d 2 3 d 7 8 d 1 5 说明 表示字符串的开始 1 3 9 d 9 0 d 2 3 d 7 8 d 1 5 使用逻辑或 将两个正则表
  • 分布式任务调度框架xxljob2.2.0详细安装使用教程

    分布式任务调度框架xxljob2 2 0详细安装使用教程 简介 概述 特性 总体设计 源码目录介绍 调度数据库 配置 架构设计 设计思想 系统组成 架构图 安装 环境 创建目录 下载 解压 初始化调度数据库 调度数据库表说明 配置部署 调度
  • 安装Visio2010 64bit时提示不能安装32位版本的Office 2010 ,因为您当前已经安装了64位Office产品的解决方法(亲测可行)

    电脑是64位的 从网上找了64位的Visio2010但是安装的时候却提示不能安装32位版本的Office 2010 因为您当前已经安装了64位Office产品 如下 然后就找到了一个解决办法 链接 https pan baidu com s
  • 网页防篡改把服务器顶满,网页防篡改技术

    网页防篡改技术 网页防篡改技术包括时间轮询技术 核心内嵌技术 事件触发技术 文件过滤驱动技术等 时间轮询技术利用网页检测程序 以轮询方式读出要监控的网页 与真实网页相比较 来判断网页内容的完整性 对于被篡改的网页进行报警和恢复 但是由于目前
  • java多态练习_Java课堂练习——多态

    一 ATM机 import java util Scanner abstract class drawmoney public abstract double getMoney double a double n class drawmon
  • dbeaver设置mysql驱动

    dbeaver 默认是没有配置mysql驱动的 方式1 可访问外网 无办公防火墙拦截 直接联网更新驱动即可 方式2 有防火墙拦截 无法更新驱动 驱动属性 编辑驱动 选择本地的mysql驱动文件 可选择maven仓库中的mysql驱动jar
  • 程序员必知的23种设计模式之访问者模式

    文章目录 1 模式引出 测评系统需求 2 传统方案 2 1 传统方式代码 3 访问者模式基本介绍 3 1 UML原理类图 3 2 对原理类图的说明 即 访问者模式的角色及职责 4 方案修改 5 双分派 6 访问者模式的注意事项和细节 6 1
  • TCP服务器epoll的多种实现

    TCP服务器epoll的多种实现 对于网络IO会涉及到两个系统对象 用户空间中进程或者线程 操作系统内核 比如发生read操作时就会经历两个阶段 等待数据就绪 将数据从内核缓冲区拷贝到用户缓冲区 由于各个阶段多有不同的情况 一组合么就产生了