Netty 单机百万连接测试

2023-05-16

1.Netty框架简介

1.1.Netty简介

netty是jboss提供的一个java开源框架,netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可用性的网络服务器和客户端程序。也就是说netty是一个基于nio的编程框架,使用netty可以快速的开发出一个网络应用。

由于java 自带的nio api使用起来非常复杂,并且还可能出现 Epoll Bug,这使得我们使用原生的nio来进行网络编程存在很大的难度且非常耗时。但是netty良好的设计可以使开发人员快速高效的进行网络应用开发。

1.2.Netty主要特性

  • 统一的API接口,支持多种传输类型,例如OIO,NIO
  • 简单而强大的线程模型
  • 丰富的文档
  • 卓越的性能
  • 拥有比原生Java API 更高的性能与更低的延迟
  • 基于池化和复用技术,使资源消耗更低
  • 安全性
  • 完整的SSL/TLS以及StartTLS支持
  • 可用于受限环境,如Applet以及OSGI

1.3.Netty和Tomcat的区别

Netty和Tomcat最大的区别就在于通信协议,Tomcat是基于Http协议的,他的实质是一个基于http协议的web容器,但是Netty不一样,他能通过编程自定义各种协议,因为netty能够通过codec自己来编码/解码字节流,完成类似redis访问的功能,这就是netty和tomcat最大的不同。

有人说netty的性能就一定比tomcat性能高,其实不然,tomcat从6.x开始就支持了nio模式,并且后续还有APR模式——一种通过jni调用apache网络库的模式,相比于旧的bio模式,并发性能得到了很大提高,特别是APR模式,而netty是否比tomcat性能更高,则要取决于netty程序作者的技术实力了。

1.4.BIO编写Client-Server通信

1、BIOServer服务端

public class BioServer {
    private static final int PORT = 8080;

    public static void main(String[] args) throws IOException {

        //新建socketServer
        ServerSocket serverSocket = null;

        try{
            //绑定对应端口
            serverSocket = new ServerSocket(PORT);
            System.out.println("the time server is start in port :"+PORT);
            Socket socket = null;
            while(true){
                //拿到请求进来的socket
                socket = serverSocket.accept();
                //线程请求
                new Thread(new TimeServerHandler(socket)).start();
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (serverSocket != null){
                System.out.println("the time server close");
                serverSocket.close();
            }
        }
    }
}

2、TimeServerHandler统一时间服务

public class TimeServerHandler implements Runnable{

    private Socket socket;

    public TimeServerHandler(Socket socket) {
        this.socket = socket;
    }

    public TimeServerHandler() {
    }

    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            //为true时autoFlush自动刷新,无需在调用flush方法
            out = new PrintWriter(this.socket.getOutputStream(),true);

            String body  = null;
			//循环监听客户端发送的msg
            while ((body = in.readLine())!=null && body.length()!=0){
                System.out.println("this time server receive msg :"+body);
                out.println(new Date().toString());
            }

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(in != null){
                try {
                    in.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            if(out != null){
                try {
                    out.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            if(this.socket != null){
                try {
                    this.socket.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

3、BIOcClient客户端

public class BioClient {

    private static final String HOST = "127.0.0.1";

    private static final int PORT = 8080;

    public static void main(String[] args) {
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;

        try{
            //创建连接
            socket = new Socket(HOST,PORT);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            out.println("I am client");
            String resp = in.readLine();
            System.out.println("当前服务器时间是:"+resp);

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(in != null){
                try {
                    in.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            if(out != null){
                try {
                    out.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            if(socket != null){
                try {
                    socket.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

在这里插入图片描述

在这里插入图片描述

BIO的优点就是模型简单,编码简单,缺点是性能瓶颈,请求数和线程数保持一致,当有N个请求发送过来,服务端需要开启N个线程去处理,高并发场景下CPU线程切换上下文损耗大。

2.常见的网络IO模型

2.1.用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件的所有权限。为例保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分成两部分,一部分为内核空间,一部分为用户空间。只能对Linux操作系统而言,将较高的1G字节供内核使用,称为内核空间,而将较低的3G字节供各个进程使用称为用户空间。

2.2.文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表达指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,他是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

2.3.TCP发送数据的流程

在这里插入图片描述

  • 第一步:应用A把消息发送到TCP发送缓冲区。
  • 第二步:TCP发送缓冲区把消息发送出去,经过网络传输,消息发送到B服务器的TCP接收缓冲区。
  • 第三步:B再从TCP接收缓冲区中去读取属于自己的数据。

2.4.阻塞/非阻塞,同步/异步

**(1)同步阻塞:**发送方发送请求之后一直等待响应。接收方处理请求时进行的IO操作如果不能马上等到返回结果,就会一致等到返回
结果后,才响应发送方,期间不能进行其他工作。

在这里插入图片描述

**(2)同步非阻塞:**发送方发送请求后,一致等待响应。接受方处理请求时进行的IO操作如果不能马上得到结果,就立即返回,去做其他事情但是由于没有得到请求处理结果,不响应发送方,发送方一致等待。当IO操作完成后,将完成状态和结果通知接收方,接收方在响应发送方,发送方才进入下一次请求过程。

在这里插入图片描述

**(3)异步阻塞:**发送方向接收方请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能马上得到记过,就会一直等到返回结果后,才响应发送方,期间不能进行其他操作。

**(4)异步非阻塞:**发送方向接收方请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能马上得到结果,也不等待,而是马上返回去做其他的事情。当IO操作完成后,将完成的状态和结果通知接收方,接收方再响应发送方。

在这里插入图片描述

2.5.Linux中五种I/O模型

  • IO的操作也就是应用程序从TCP缓冲区中读取数据的时候。
  • 网络I/O的本质是socket的读取,socket在linux中被抽象为流,I/O可以理解为对流的操作。对于一次I/O访问,数据会先被拷贝到操作系统的内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说当一个read操作发生时,他会经历两个阶段:
第一阶段:等待数据准备(Waiting for the data to be ready)
第二阶段:将数据从内核拷贝到进程中(Copy the data from the kernel to the process)

对于socket流而言:

第一步:通常涉及等待网络上的数据分组到达,然后被复制到内核的某个缓冲区
第二步:把数据从内核缓冲区复制到进程缓冲区

1、阻塞IO

(1)什么是阻塞I/O

阻塞IO就是当应用程序向TCP缓冲区发起读取数据申请时,在内核数据没有准备好之前,应用程序会一致处于等待数据的状态,直到内核把数据准备好交给应用程序才结束。

**术语描述:**在应用程序调用recvfrom读取数据时,其系统调用直到数据包到达别并且被复制到应用缓冲区中或者发生错误时才返回,此期间一致处于等待,进程从调用直到返回这段时间被阻塞的成为阻塞IO。

(2)阻塞I/O流程

在这里插入图片描述

  • 第一步:应用程序向内核发起recvfrom读取数据
  • 第二步:准备数据报(应用进程阻塞)
  • 第三步:将数据从内核复制到应用空间
  • 第四步:复制完成后,返回成功提示

2、非阻塞I/O

(1)什么是非阻塞I/O

非阻塞I/O就是当应用程序发起读取数据时,如果内核没有准备好数据报,会返回给应用程序,不会让应用程序一致等待,但是应用程序要时不时去尝试调用,当数据包准备好时,将数据从内核复制到用户空间,这个过程也是同步的,阻塞的。

**术语描述:**非阻塞I/O是在应用调用recvfrom读取数据时,如果缓冲区中没有数据的话,就会直接返回一个EWOULDBLOCK错误,不会让应用一致等待。在没有数据时会即刻返回错误标识,那也意味着如果应用要读取数据就需要不断的调用recvfrom请求,直到读取到它要的数据为止。

(2)非阻塞I/O流程

在这里插入图片描述

  • 第一步:应用进程向内核发起recvfrom读取数据
  • 第二步:没有数据报准备好,即刻返回EWOULDBLOCK错误码
  • 第三步:应用程序向内核再次发起recvfrom读取数据
  • 第四步:已有数据报就从内核拷贝到用户空间,否则还是返回错误码

3、I/O多路复用

(1)什么时I/O多路复用

I/O多路复用的思路就是系统提供了一种函数可以同时监控多个网络请求的操作,这个函数就是我们常说的select、poll、epoll函数,有了这个函数后,应用线程通过调用select函数就可以同时监控多个网络请求,select函数监控的网络请求中只要有任何一个数据状态准备就绪了,select函数就会返回可读状态,这时询问线程再去通知处理数据的线程,对应线程此时再发起recvfrom请求去读取数据。

**术语描述:**进程通过将一个或者多个网络请求传递给select,阻塞在select操作之上,select帮我们侦测多个网络请求是否准备就绪,当有网络请求准备就绪时,select返回数据可读状态,应用程序在调用recvfrom读取数据。

(2)I/O多路复用流程

在这里插入图片描述

  • 第一步:进程发起网络请求到select函数调用进行阻塞
  • 第二步:select函数调用内核获取数据报
  • 第三步:select函数监控的网络请求中只要有任何一个数据状态准备就绪了,select函数就会返回可读状态
  • 第四步:询问线程再去通知处理数据的线程,对应线程在次发起recvfrom请求去读取数据

4、信号驱动I/O

(1)什么是信号驱动I/O

信号驱动I/O不是循环请求询问的方式去监控数据就绪状态,而是调用sigaction时候建立一个SIGIO的信号联系,当内核数据准备好之后在通过SISGIO信号通知线程数据准备好后的可读状态,当线程收到可读状态的信号后,此时在向内核发起recvfrom读取数据的请求,因为信号驱动I/O的模型下应用线程在发出信号监控后即可返回,不会阻塞,所以这样的方式下,一个应用线程也可以控制多个网络请求。

**术语描述:**首先开启套接字信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数,此时请求即可返回,当数据准备就绪时,就生成对应进程的SIGIO信号,通过信号回调通知应用线程调用recvfrom来读取数据。

(2)信号驱动I/O流程

在这里插入图片描述

  • 第一步:进程建立SIGIO的信号处理程序调用sigaction,然后返回
  • 第二步:内核准备好数据递交SIGIO给信号处理程序
  • 第三步:应用程序收到信号后,调用数据拷贝,复制完成返回数据报

5、异步I/O

(1)什么是异步I/O

异步I/O应用只需要向内核发送一个read请求,告诉内核他要读取数据后即刻返回,内核收到请求后会建立一个信号联系,但数据准备就绪,内核会主动把数据从内核复制到用户空间,等所有操作都完成之后,内核会发起一个通知告诉应用,处理数据报完成。

**术语描述:**应用告知内核启动某个操作,并让内核在整个操作完成之后,通知应用,这种模型与信号驱动的主要区别在于信号驱动I/O是由内核通知我们何时开始下一个I/O,而异步I/O模型是由内核通知我们操作什么时候完成。

(2)异步I/O流程

在这里插入图片描述

  • 第一步:异步I/O应用只需要向内核发送一个read请求,告诉内核他要读取数据后即刻返回
  • 第二步:内核收到请求后会建立一个信号联系,但数据准备就绪
  • 第三步:内核会主动把数据从内核复制到用户空间,等所有操作都完成之后,内核会发起一个通知告诉应用,处理数据报完成

2.6.I/O多路复用之select,poll,epoll

1、select、poll、epoll简介

目前支持I/O多路复用的系统调用有 select,pselect,poll,epoll,I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作但select,pselect,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

epoll跟select都能提供多路I/O复用的解决方案。在现在的Linux内核里有都能够支持,其中epoll是Linux所特有,而select则应该是POSIX所规定,一般操作系统均有实现。

2、select函数

(1)基本原理

int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout);

select函数监视的文件描述符分为3类,分别是writefds、readfds和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fd_set,来找到就绪的文件描述符。

  • maxfdp1:待测试的文件描述符的个数,它的值是待测试的最大值加1

  • readfds:select监视的可读文件句柄集合

  • writefds:select监视的可写文件句柄集合

  • exceptfds:select监视的异常文件句柄集合

  • timeout:本次select()的超时结束时间。

(2)fd_set

select()提供了一种fd_set的核心数据结构,实际上是一个long类型的数组,每一个数组元素都能与一个打开的文件句柄(不管是Socket句柄,还是其他文件/命令管道/设备句柄)建立联系。当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一个socket或文件可读。

(3)select函数的优缺点

select目前几乎所有平台上支持,其良好的跨平台支持也是它的一个优点。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点:

  • select最大的缺陷就是单个进程所打开的fd是有一定限制的,它由FD_SETSIZE设置,默认值是1024。
一般来说这个数目和系统的内存关系很大,具体数目可以cat /proc/sys/fs/file-max 查看,32位默认是1024,64位默认是2048
  • 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。
当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都要遍历一遍,这会浪费很多CPU时间。
  • 需要维护一个用来存放大量fd的数据结构,这样会使用户空间和内核空间在传递该结构时复制开销大。

(4)select()总结

从执行过程来看,使用基于select的IO多路复用和同步阻塞IO没有太大的区别,而且多添加了监视socket以及调用select函数额外的操作,按理说效率更低。但是,select()可以让用户在一个线程内同时处理多个socket的IO请求,用户可以注册多个感兴趣的socket,然后不断的调用select轮询被激活的socket,即可达到单线程处理多个IO请求的目的。而在同步阻塞的的模型中,必须通过多线程的方式才能达到目的。

3、poll函数

(1)基本原理

int poll(struct pollfd *fds,nfds_t nfds,int timeout);

typedef struct pollfd{
	int fd;			//需要被检测或选择的文件描述符
	short events;   //对文件描述符fd上感兴趣的事件
	short revents;	//文件描述符fd上当前实际发生的事件
} pollfd_t;

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态。如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后并没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后他又要再次遍历fd,这个过程经历了多次无谓的遍历。

  • poll函数参数说明:
    • pollfd *fds:pollfd类型的数组,指向一个结构体数组的第0个元素,用于存放需要检测状态的socket描述符,并且调用poll函数之后fds数组不会被清空。
    • nfds_t nfds:数组fds中描述符的总数量。
    • timeout:超时连接。
    • pollfd:表示一个被监视的文件描述符,通常传递fds执行poll()监视多个文件描述符。
    • events:指定监视fd的事件(输入,输出,错误),是监视该文件描述符的事件掩码,由用户来设置。
    • revents:文件描述符的操作结果事件掩码,内核在调用返回时设置。

它没有最大连接数限制,原因是它是基于链表来存储的,但是同样有一个缺点:

1、大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义的。
2、poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降,同样包含大量文件描述符的数组依然会被整体从用户态复制到内核空间,而且内核也要遍历数组,对效率改善不大。

在这里插入图片描述

4、epoll函数

epoll是在2.6内核中提出的。是之前的select和poll的增强版本。相比于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

(1)基本原理

epoll()是基于事件驱动的I/O方式,是Linux内核位处理大批量文件描述符而作了改进的poll,其实现机制与select/poll机制完全不同。epoll()没有描述符个数限制,它使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的拷贝操作只需要一次。

epoll()通过在内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个操作部分,在linux中,这三个部分对应的函数如下所示:

int epoll_create(int size);

int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event);

int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout);
  • epoll_create:负责建立一个epoll对象,在epoll文件系统中为了句柄对象分配资源。参数size表明内核要监听的描述符数量。
  • epoll_ctl:负责向内核的epoll对象中添加要监听的事件类型(文件描述符),已添加的描述符被维护在一颗红黑树上。
  • epoll_wait:负责收集已就绪事件的连接。

eventpoll

struct eventpoll{
    ...
    //红黑树的根节点,树中存储这所有添加到epoll中需要被监听的事件
    struct rb_root rbr;
    //双链表,存放这通过epoll_wait()返回的就绪事件
    struct list_head rdlist;
}
每个epoll对象都有一个独立的eventpoll,用于存放通过epoll_ctl()添加进来的事件。这些事件维护在红黑树中,红黑树的插入时间效率是log(n)(n为树的高度)
此外,被监听的事件都会与设备驱动程序建立回调关系,每当被监听的事件就绪,系统注册的回调函数就会被调用,将就绪事件放到rdList中,时间复杂度为O(1).
当调用epoll_wait()时,无需遍历整个被监听的描述符集,只需要遍历eventpoll对象中的rdlist双链表中是否有epitem元素即可。然后就把就绪事件复制到用户态,同时将事件数量返回给用户。
struct epitem{
    struct rb_node  rbn;         // 红黑树节点
    struct list_head    rdllink; // 双向链表节点
    struct epoll_filefd  ffd;    // 事件句柄信息
    struct eventpoll *ep;        // 指向所属的eventpoll对象
    struct epoll_event event;    // 期待发生的事件类型
}

epoll除了提供水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这使得用户空间程序有可能缓存IO状态,减少epoll_wait的调用,提高应用程序效率。

  • **水平触发(LT):**默认工作模式,当epoll_wait检测到某描述符事件就绪并通知应用进程时,应用进程可以不立即处理该事件,下次调用epoll_wait时,会再次通知进程。
  • 边缘触发(ET): 当epoll_wait检测到某描述符事件就绪并通知应用进程时,应用进程必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。这减少了同一事件的触发次数,使效率更高。

在这里插入图片描述

selectpollepoll
操作方式遍历遍历回调
底层实现数组链表红黑树
IO效率每次调用都进行线性遍历,时间复杂度为O(n)每次调用都进行线性遍历,时间复杂度为O(n)事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到readyList里面,时间复杂度O(1)
最大连接数1024(x86)或2048(x64)无上限无上限
fd拷贝每次调用select,都需要把fd集合从用户态拷贝到内核态每次调用poll,都需要把fd集合从用户态拷贝到内核态调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝

显示详细信息

2.7.Java的I/O演进历史

1、jdk1.4之前是采用同步阻塞模型,也就是BIO
    大型服务一般采用C或者C++, 因为可以直接操作系统提供的异步IO,AIO

2、jdk1.4推出NIO,支持非阻塞IO,jdk1.7升级,推出NIO2.0,提供AIO的功能,支持文件和网络套接字的异步IO

2.8.Reactor三种线程模型

设计模式——Reactor模式(反应器设计模式),是一种基于事件驱动的设计模式,在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求
一般出现在高并发系统中,比如Netty,Redis等

1、单线程模型

单个线程以非阻塞IO或事件IO处理所有IO事件,包括连接、读、写、异常、关闭等等。单线程Reactor模型基于同步事件分离器来分发事件,这个同步事件分离器,可以看作是一个单线程的while循环。下图描述了单线程模型的处理过程:

在这里插入图片描述

注意上面的Selector之所以会有OP_ACEEPT事件,是因为在单线程模型中,Selector轮询的时监听套接字与已连接客户端套接字的所有IO事件。

单线程处理所有IO事件的弊端很明显。没能利用计算机CPU多核的特性,一个线程某个时刻只能处理单个IO事件,此时如果有其他描述符IO事件就绪,这些IO事件将暂时得不到处理。

c++框架libevent中,基于event_base_loop做消息轮询,使用event_base_dispatch来分发IO消息,本质上是对上述模型的封装。如果不适用evthread_use_pthreads,则其默认的就是单线程模型处理请求。

2、多线程模型

一个线程/进程接收连接、一组线程/进程处理IO读写事件。也就是将accept的线程与处理读、写等IO事件的线程分离,并且使用m多个线程以非阻塞IO或者事件IO来处理n个套接字的IO事件,这里的n一般远大于m,线程数m一般取CPU逻辑核心数的1-3倍,而套接字数n则取决于请求数和进程可以打开的最大描述符个数。下图是多线程模型:

在这里插入图片描述

可以看到,这里把客户端的已连接套接字,转交给某个IO线程之后,由此线程轮询处理其他之后的所有IO事件,这实际参考了netty4的线程模型设计。实际reactor的多线程模型,并不需要将已连接套接字绑定在某个线程上,也可以统一放在连接池中,由多个IOWork线程从池中取连接进行轮询并处理,但这样会复杂很多,而且容易出问题,比如说不同线程从同一个channel收到了write事件,这就类似惊群问题了;并且多线程并发操作同一个channel,后续很可能需要你将IO事件进行同步,与其如此,不如直接将channel绑定到一个线程,让channel上触发与处理IO事件逻辑上同步。netty3中channel(已连接套接字)入站事件由固定线程处理,出站事件由触发的线程处理,netty4中修改了设计,将channel绑定到固定的eventloop(线程)。

另外一点,每个已连接套接字的IO事件由固定线程处理,不代表事件也一定由此线程触发,恰恰相反,实际业务中,读(入站)事件来自于客户端写数据触发,而写(出站)事件往往由别的线程触发,例如在发起一个异步mysql操作完成之后,在异步回调线程中写结果数据来触发套接字的出站。

3、主从多线程模型

一组线程/进程接收连接、一组线程/进程处理IO读写事件。它与多线程模型的主要区别在于使用一组线程或进程在一个共享的监听套接字上accept连接。这么做的原因是为了应付**单个线程/进程不足以快速处理内核中监听套接字的已连套接字队列(并发量极大)**的情况。如下:

在这里插入图片描述

4、Netty支持的线程模型

  • Netty支持单线程、多线程模型、主从多线程模型。

在这里插入图片描述

  • 初始化NioEventLoopGroup , 将为ServerSocketChannel 提供一个 bossGroup 线程池,为 SockerChannel 的I/O 事件处理 提供一个workGroup
  • 使用ServerBootstrap 绑定端口等相关信息,此时会初始化一个ServerSocketChannel 和 bossGroup,并且将 ServerSocketChannel 绑定到 bossGroup 中的一个NioEventLoop 中进行监听客户端的连接请求
  • 当 Client 发起连接请求时,首先经过三次握手通过后,然后服务端被触发,接着收到连接成功的通知(因为是异步所以是触发)
  • ServerSocketChannel 收到连接成功的通知后,将建立好的连接交给 workGroup中的某个NioEventLoop,然后将感兴趣的事件注册到 该 NioEventLoop 持有的Selector上,等待Client 下一次请求
  • 当 Client 发起 READ/WRITE 相关的请求时,则提交给NioEventLoop 进行处理

3.Netty搭建Echo服务

3.1.什么是Echo服务

  • 就是一个应答服务(回显服务器),客户端发送什么数据,服务端就响应对应的数据,常用于检测和调试服务。

搭建Netty项目

  • 创建maven项目,加入netty的依赖包
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.32.Final</version>
        </dependency>

3.2.Echo服务搭建

1、Echo服务端搭建-EchoServer

  • 绑定端口
  • 创建主从线程组
  • 启动线程组,指定通道类型,处理传过来的数据内容
  • 监听端口,关闭端口
  • 释放线程
public class EchoServer{
    //设定端口号
    private int port;
    
    //构造方法传入端口
    public EchoServer(int port){
        this.port = port;
    }
    
    public void run() throws InterruptedException {
        
        //创建线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        
        try{
        	//创建启动引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //设置线程组
            serverBootstrap.group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>(){
                    protected void initChannel(SocketChannel socketChannel) throws Exception{
                        //设置处理器,可以设置多个
                        socketChannel.pipeline().addLast(new EchoServerHandler());
                    }
                });
            
           	System.out.println("Echo服务启动中...");
            //绑定端口,同步等待
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
             
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        }finally{
            //优雅退出
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefullt();
        }
        
    }
    
    public static void main(String[] args) throws InterruptedException {
    	
        //设置默认的端口
        int port = 8080;
        if(args.length>0){
            port = Integer.parseInt(args[0]);
        }    
        
        //调用启动方法
        new EchoServer(port).run();
    }
}

2、Echo服务端的处理器-EchoServerHandler

public EchoServerHandler extends ChannelInboundHandlerAdapter{
    
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        ByteBuf data = (ByteBuf) msg;
        System.out.println("服务端收到数据:"+data.toString(CharsetUtil.UTF_8));
        //注意:数据一定要回写出去,不然客户端收不到
        ctx.writeAndFlush(data);
    } 
    
   @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("EchoServerHandler EchoServerHandler()");
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3、Echo客户端搭建-EchoClient

public class EchoClient{
    
    //客户端发起请求的ip地址
    private String host;
    
    //端口
    private int port;
    
    //构造方法传入ip+端口初始化
    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    
    public void start(){
        //创建线程组
        EventLoopGroup group = new NioEventLoopGroup();
        
        try{
            //创建启动引导类
            BootStrap bootStrap = new BootStrap();
            
            bootStrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(host,port))
                .handler(new ChannelInitializer<SocketChannel>{
            	 @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //设置客户端处理器
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                        }   
            });
            //异步连接,同步阻塞,connect是异步连接
            ChannelFuture channelFuture = bootStrap.connect().sync();
            
            //阻塞住直到客户端通道关闭
            channelFuture.channel().closeFuture().sync();
        }finally{
            //优雅退出
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] args){
        //启动客户端连接
        new EchoClient("127.0.0.1",8080).start();
    } 
}

4、EchoClient处理器-EchoClientHandler

  • 与服务端处理类继承的不一样,但是实质是一样的
  • 打印顺序是,先走Active()方法,先激活,标识服务端建立了通道
  • 读取服务端的数据
  • 读取完成,进入channelReadComplete()方法
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        System.out.println("Client received:"+byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("EchoClientHandler.channelActive()");
        //创建一个缓存,使用Unpooled工具类
        ctx.writeAndFlush(Unpooled.copiedBuffer("李祥",CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("EchoClientHandler.channelReadComplete()");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

5、Echo服务中名词解析

(1)EventLoop和EventLoopGroup

线程和线程组,前者可以理解为线程,后者可以理解为线程组。

(2)BootStrap

启动引导类,用于配置线程组,启动的时候,同时启动线程组,以及开启通道,初始化通道。和一些处理。

(3)channel

channel是客户端和服务端建立的一个连接,是一个socket连接,具有生命周期,建立成功,读取数据,读取完成,出现异常等等。

(4)channelHandler和channelPipeline

channelHandler是做处理的,对接收的数据进行处理到要直接或间接继承channelHandler。channelPipeline就好比一个处理工厂,可以添加很多handler处理类,当进入channelHandler后都要经过pipeline添加的handler进行处理。

6、测试

在这里插入图片描述

在这里插入图片描述

4.Netty核心源码分析

4.1.Java NIO之Selector

1、什么是Selector

Selector(选择器)是java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

2、为什么使用Selector

仅用单线程来处理多个channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程来处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统一些资源(如内存),因此使用的线程越少越好。

3、Selector的创建

通过调用Selector.open()方法创建一个Selector,如下:

Selector selector = Selector.open();

4、向Selector注册通道

为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下:

ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
SelecttionKey key = channel.register(selector,SelectionKey.OP_READ);

与Selector一起使用时,Channel必须处于非阻塞模式下,这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道可以。

注意register()方法的第二个参数。这是一个interest集合,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:

  • Connect:SelectionKey.OP_CONNECT “连接就绪”
  • Accept:SelectionKey.OP_ACCEPT “接收就绪”
  • Read:SelectionKey.OP_READ “读就绪”
  • Write:SelectionKey.OP_WRITE “写就绪”

如果不止对一种事件感兴趣,可以用”位或“操作符将常量符拼接起来,如下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE

5、SelectionKey

当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含了一些属性:

(1)interest集合

interest集合就是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合,用“位与”操作interest 集合和给定的SelectionKey常量,可以确定某个确定的事件是否在interest 集合中。

int interestSet = selectionKey.interestOps();

//判断接收接续的事件是否在集合中
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
//判断连接就绪的事件是否在集合中
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
//判断读就绪事件是否在集合中
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
//判断写就绪事件是否在集合中
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

(2)ready集合

ready集合是通道已经准备就绪的操作的集合,在一次选择(Selection)之后,你会首先访问这个ready Set。

//获取读就绪事件的个数
int readySet = selectionKey.readyOps();

可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:

//判断接收就绪的事件
selectionKey.isAcceptable();
//判断连接就绪的事件
selectionKey.isConnectable();
//判断读就绪的事件
selectionKey.isReadable();
//判断写就绪事件
selectionKey.isWriteable();

(3)Channel+Selector

从SelectionKey访问Channel和Selector很简单.

//获取当前channel
Channel channel = selectionKey.channel();

//获取当前selector
Selector selector = selectionKey.selector();

(4)附加对象

可以将一个对象或者更多的信息附着在SelectionKey上,这样就能方便的识别某个给定的通道,例如,可以附加与通道一起使用的Buffer,或者包含聚集数据的某个对像。

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

还可以通过register()方法像Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector,SelectionKey.OP_READ,thObject);

6、通过Selector选择通道

一旦向Selector注册了一个或者多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(连接、接受、读或者写)已经准备就绪的那些通道。

(1)int select()

阻塞到至少有一个通道在你注册的事件上就绪了。

(2)int select(long timeout)

和select一样,处理最长会阻塞timeout毫秒(参数)

(3)int selectNow()

不会阻塞,不管什么通道就绪都立即返回,如果没有通道可以选择,此方法直接返回零

select()方法返回的int值表示有多少通道已经准备就绪。

7、selectedKeys()

一旦调用select()方法后,并且返回值有一个或多个准备就绪了,然后就可以通过调用selector的selectedKeys()方法,访问已选择键集中的就绪通道。

Set selectedKeys = selector.selectedKeys();

当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

这个循环遍历已选择键集中的每个键,并检测各个键锁对应的通道的就绪事件。

注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。

SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。

8、wakeUp()

某个线程调用select()方法阻塞后,即使没有通道已经就绪,也有办法让其从select()中返回,只要让其他线程在第一个线程调用select()方法的哪个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。

如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。

9、close()

用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

10.完整示例

//创建ServerSocketChannel对象
ServerSocketChannel channel = ServerSocketChannel.open();
//创建selector对像
Selector selector = Selector.open();
//设置非阻塞
channel.configureBlocking(false);
//绑定通道,返回读就绪的事件
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
   //查询当前已经有多少读就绪的事件
  int readyChannels = selector.select();
  //如果都就绪事件为0推出当前循环
  if(readyChannels == 0) continue;
  //调用select方法后,只要有就绪的通道,就开始遍历通道
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    //判断是否是那四种事件类型,做除响应的处理
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
  }
}

4.2.EventLoop和EventLoopGroup

1、EventLoop和EventLoopGroup简介

为解决系统在运行中频繁切换上下文带来的性能的损耗。而且要考虑并发下数据的安全。Netty采用了串行化的设计理念,从消息的读取、编码以及后续ChannelHandler的执行,始终都由IO线程EventLoop负责,这就意味着整个流程不会进行线程上下文的切换,数据也不会面临被并发修改的风险。

EventLoopGroup是一组EventLoop的抽象,一个EventLoopGroup当中包含一个或者多个EventLoop,EventLoopGroup提供next接口,可以从一组EventLoop里卖弄按照一定的规则获取其中一个EventLoop来进行工作。

2、分析这两块代码

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

这里创建了两个EventLoopGroup对象bossGroup和workGroup,bossGroup主要用来处理server socket监听client socket的连接请求,server socket接收了新的连接后,会把connection socket放到workGroup中进行处理,也就是说workGroup主要用来处理connection socket的网络IO和相关的业务逻辑。workGroup会由next选择其中一个EventLoop来讲这个SocketChannel注册到其维护的Selector并对后续的IO事件进行处理。

ChannelPipeline中每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件的。所以至关重要不要阻塞这个线程。

总结:

  • NioEventLoopGroup实际上就是个线程池,一个EventLoopGroup包含一个或者多个EventLoop
  • 一个EventLoop在它的生命周期内只和一个Thread绑定
  • 所有的EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册一个EventLoop
  • 每一个EventLoop负责处理一个或者多个Channel
  • 一个EventLoop维护一个Selector

3、跟踪NioEventLoopGroup构造函数

(1)NioEventLoopGroup

 public NioEventLoopGroup(int nThreads) {
     this(nThreads, (Executor) null);
 }
    
 public NioEventLoopGroup() {
     this(0);
 }

(2)进入到NioEventLoopGroup的父类MultithreadEventLoopGroup

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

可以看见NioEventGroup的构造函数如果nThreads为非0值时,则为传入的实际值,如果为0的话或者没有参数,则为DEFAULT_EVENT_LOOP_THREADS,DEFAULT_EVENT_LOOP_THREADS为系统内核的2倍。

    static {
        //获取系统的内核*2付给DEFAULT_EVENT_LOOP_THREADS,静态块加载
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4.3.Netty的启动引导类

Netty的启动类分为客户端启动类和服务端启动类,分别是Bootstrap和ServerBootStrap。他们都是AbstractBootStrap的子类,总的来说他们都是Netty中的辅助类,提供了链式配置方法,方便Channel的引导和启动。

1、服务端启动引导类ServerBootStrap

(1)group:设置线程模型,Reactor线程模型对比EventLoopGroup

  • 先看一下ServerBootStrap的group方法

    • 双参group方法

    在这里插入图片描述

    • 单参group方法

    在这里插入图片描述

  • 单线程方式

在这里插入图片描述

  • 多线程方式

在这里插入图片描述

  • 主从线程方式

在这里插入图片描述

(2)channel:设置channel通道类型NioServerSocketChannel、OioServerSocketChannel

  • 用来设置交互模型的,5种IO模型

(3)option:作用于每个新建立的channel,设置TCP连接中的一些参数

  • ChannelOption.SO_BACKLOG:存放已完成三次握手的请求的等待队列的最大长度。
  • 设置ChannelOption.SO_BACKLOG参数

在这里插入图片描述

  • Linux服务器TCP连接底层:
    • syn queue:半连接队列,tcp_max_syn_backlog
    • accept_queue:全连接队列,net.core.somaxconn
  • 系统默认的somaxconn参数要足够大,如果backlog比somaxconn大,则会优先用后者。

在这里插入图片描述

  • ChannelOption.TCP_NODELAY:为解决Nagle算法的问题,默认是false,要求高实时性,有数据时马上发送,就将该选项设置为true打开Nagle算法。

(4)childOption: 作用于被accept之后的连接

(5)childHandler:用于对每个通道里面的数据处理

在这里插入图片描述

2、客户端启动引导类Bootstrap

在这里插入图片描述

3、客户端与服务端的引导过程

(1)引导服务器

在这里插入图片描述

(2)引导客户端

在这里插入图片描述

4.Netty核心源码分析

4.1.Java NIO之Selector

1、什么是Selector

Selector(选择器)是java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

2、为什么使用Selector

仅用单线程来处理多个channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程来处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统一些资源(如内存),因此使用的线程越少越好。

3、Selector的创建

通过调用Selector.open()方法创建一个Selector,如下:

Selector selector = Selector.open();

4、向Selector注册通道

为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下:

ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
SelecttionKey key = channel.register(selector,SelectionKey.OP_READ);

与Selector一起使用时,Channel必须处于非阻塞模式下,这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道可以。

注意register()方法的第二个参数。这是一个interest集合,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:

  • Connect:SelectionKey.OP_CONNECT “连接就绪”
  • Accept:SelectionKey.OP_ACCEPT “接收就绪”
  • Read:SelectionKey.OP_READ “读就绪”
  • Write:SelectionKey.OP_WRITE “写就绪”

如果不止对一种事件感兴趣,可以用”位或“操作符将常量符拼接起来,如下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE

5、SelectionKey

当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含了一些属性:

(1)interest集合

interest集合就是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合,用“位与”操作interest 集合和给定的SelectionKey常量,可以确定某个确定的事件是否在interest 集合中。

int interestSet = selectionKey.interestOps();

//判断接收接续的事件是否在集合中
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
//判断连接就绪的事件是否在集合中
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
//判断读就绪事件是否在集合中
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
//判断写就绪事件是否在集合中
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

(2)ready集合

ready集合是通道已经准备就绪的操作的集合,在一次选择(Selection)之后,你会首先访问这个ready Set。

//获取读就绪事件的个数
int readySet = selectionKey.readyOps();

可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:

//判断接收就绪的事件
selectionKey.isAcceptable();
//判断连接就绪的事件
selectionKey.isConnectable();
//判断读就绪的事件
selectionKey.isReadable();
//判断写就绪事件
selectionKey.isWriteable();

(3)Channel+Selector

从SelectionKey访问Channel和Selector很简单.

//获取当前channel
Channel channel = selectionKey.channel();

//获取当前selector
Selector selector = selectionKey.selector();

(4)附加对象

可以将一个对象或者更多的信息附着在SelectionKey上,这样就能方便的识别某个给定的通道,例如,可以附加与通道一起使用的Buffer,或者包含聚集数据的某个对像。

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

还可以通过register()方法像Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector,SelectionKey.OP_READ,thObject);

6、通过Selector选择通道

一旦向Selector注册了一个或者多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(连接、接受、读或者写)已经准备就绪的那些通道。

(1)int select()

阻塞到至少有一个通道在你注册的事件上就绪了。

(2)int select(long timeout)

和select一样,处理最长会阻塞timeout毫秒(参数)

(3)int selectNow()

不会阻塞,不管什么通道就绪都立即返回,如果没有通道可以选择,此方法直接返回零

select()方法返回的int值表示有多少通道已经准备就绪。

7、selectedKeys()

一旦调用select()方法后,并且返回值有一个或多个准备就绪了,然后就可以通过调用selector的selectedKeys()方法,访问已选择键集中的就绪通道。

Set selectedKeys = selector.selectedKeys();

当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

这个循环遍历已选择键集中的每个键,并检测各个键锁对应的通道的就绪事件。

注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。

SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。

8、wakeUp()

某个线程调用select()方法阻塞后,即使没有通道已经就绪,也有办法让其从select()中返回,只要让其他线程在第一个线程调用select()方法的哪个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。

如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。

9、close()

用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

10.完整示例

//创建ServerSocketChannel对象
ServerSocketChannel channel = ServerSocketChannel.open();
//创建selector对像
Selector selector = Selector.open();
//设置非阻塞
channel.configureBlocking(false);
//绑定通道,返回读就绪的事件
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
   //查询当前已经有多少读就绪的事件
  int readyChannels = selector.select();
  //如果都就绪事件为0推出当前循环
  if(readyChannels == 0) continue;
  //调用select方法后,只要有就绪的通道,就开始遍历通道
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    //判断是否是那四种事件类型,做除响应的处理
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
  }
}

4.2.EventLoop和EventLoopGroup

1、EventLoop和EventLoopGroup简介

为解决系统在运行中频繁切换上下文带来的性能的损耗。而且要考虑并发下数据的安全。Netty采用了串行化的设计理念,从消息的读取、编码以及后续ChannelHandler的执行,始终都由IO线程EventLoop负责,这就意味着整个流程不会进行线程上下文的切换,数据也不会面临被并发修改的风险。

EventLoopGroup是一组EventLoop的抽象,一个EventLoopGroup当中包含一个或者多个EventLoop,EventLoopGroup提供next接口,可以从一组EventLoop里卖弄按照一定的规则获取其中一个EventLoop来进行工作。

2、分析这两块代码

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

这里创建了两个EventLoopGroup对象bossGroup和workGroup,bossGroup主要用来处理server socket监听client socket的连接请求,server socket接收了新的连接后,会把connection socket放到workGroup中进行处理,也就是说workGroup主要用来处理connection socket的网络IO和相关的业务逻辑。workGroup会由next选择其中一个EventLoop来讲这个SocketChannel注册到其维护的Selector并对后续的IO事件进行处理。

ChannelPipeline中每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件的。所以至关重要不要阻塞这个线程。

总结:

  • NioEventLoopGroup实际上就是个线程池,一个EventLoopGroup包含一个或者多个EventLoop
  • 一个EventLoop在它的生命周期内只和一个Thread绑定
  • 所有的EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册一个EventLoop
  • 每一个EventLoop负责处理一个或者多个Channel
  • 一个EventLoop维护一个Selector

3、跟踪NioEventLoopGroup构造函数

(1)NioEventLoopGroup

 public NioEventLoopGroup(int nThreads) {
     this(nThreads, (Executor) null);
 }
    
 public NioEventLoopGroup() {
     this(0);
 }

(2)进入到NioEventLoopGroup的父类MultithreadEventLoopGroup

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

可以看见NioEventGroup的构造函数如果nThreads为非0值时,则为传入的实际值,如果为0的话或者没有参数,则为DEFAULT_EVENT_LOOP_THREADS,DEFAULT_EVENT_LOOP_THREADS为系统内核的2倍。

    static {
        //获取系统的内核*2付给DEFAULT_EVENT_LOOP_THREADS,静态块加载
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4.3.Netty的启动引导类

Netty的启动类分为客户端启动类和服务端启动类,分别是Bootstrap和ServerBootStrap。他们都是AbstractBootStrap的子类,总的来说他们都是Netty中的辅助类,提供了链式配置方法,方便Channel的引导和启动。

1、服务端启动引导类ServerBootStrap

(1)group:设置线程模型,Reactor线程模型对比EventLoopGroup

  • 先看一下ServerBootStrap的group方法

    • 双参group方法

    在这里插入图片描述

    • 单参group方法

    在这里插入图片描述

  • 单线程方式

在这里插入图片描述

  • 多线程方式

在这里插入图片描述

  • 主从线程方式

在这里插入图片描述

(2)channel:设置channel通道类型NioServerSocketChannel、OioServerSocketChannel

  • 用来设置交互模型的,5种IO模型

(3)option:作用于每个新建立的channel,设置TCP连接中的一些参数

  • ChannelOption.SO_BACKLOG:存放已完成三次握手的请求的等待队列的最大长度。
  • 设置ChannelOption.SO_BACKLOG参数

在这里插入图片描述

  • Linux服务器TCP连接底层:
    • syn queue:半连接队列,tcp_max_syn_backlog
    • accept_queue:全连接队列,net.core.somaxconn
  • 系统默认的somaxconn参数要足够大,如果backlog比somaxconn大,则会优先用后者。

在这里插入图片描述

  • ChannelOption.TCP_NODELAY:为解决Nagle算法的问题,默认是false,要求高实时性,有数据时马上发送,就将该选项设置为true打开Nagle算法。

(4)childOption: 作用于被accept之后的连接

(5)childHandler:用于对每个通道里面的数据处理

在这里插入图片描述

2、客户端启动引导类Bootstrap

在这里插入图片描述

3、客户端与服务端的引导过程

(1)引导服务器

在这里插入图片描述

(2)引导客户端

在这里插入图片描述

5.Netty数据传输编解码

5.1.什么是编码、解码

  • 高性能RPC框架的三个要素:IO模型、数据协议、线程模型
  • 最开始接触的编码:Java序列化/反序列化、URL编码、base64编解码
  • java自带序列化的缺点:
    • 无法跨语言
    • 序列化后的码流太大,也就是数据报太大
    • 序列化和反序列化性能比较差
  • 业界里面也有其他编码框架:
    • ProtoBuf(PB):ProtoBuf是google的一个结构数据序列化方法框架,可简单类比XML,语言无关、平台无关,支持java、c、python等多种语言,高效,比XML更小,扩展性、兼容性好。
    • Trift:Facebook下的一款编解码框架,thrift可以支持多种程序语言,在多种不同的语言之间通信thrift可以作为二进制的高性能的通讯中间件,支持数据(对象)序列化和多种类型的RPC服务。Thrift适用于程序对程序静态的数据交换,需要先确定好他的数据结构,他是完全静态化的,当数据结构发生变化时,必须重新编辑IDL文件。
  • Netty里面的编解码
    • 解码器:负责处理“入站 InboundHandler”数据
    • 编码器:负责处理“出站 OutboundHandler”数据
    • Netty里面提供默认的编解码器,也支持自定义编解码器
      • Encoder:编码器
      • Decoder:解码器
      • Codec:编解码器

5.2.Netty解码器之Decoder

Netty提供了丰富的节码器抽象基类,我们可以很容易的实现这些基类来实现自定义的解码器。

  • 解码字节到消息:ByteToMessageDecoder和ReplayingDecoder
  • 解码消息到消息:MessageToMessageDecoder

decoder负责将“入站”数据从一种格式转换成另一种格式,Netty的节码是一种ChannelInboundHandler的抽象实现。实践中使用解码器很简单,就是将入站数据转换格式后传递到ChannelPipeline中的下一个ChannelInboundHandler进行处理,这样的处理是很灵活的,我们可以将解码器放在ChannelPipeline中,重用逻辑。

1、ByteToMessageDecoder

ByteToMessageDecoder是用于将字节转为消息(或其他字节序列)

你不能确定远端是否会一次发送完一个完整的“消息”,因此这个类会缓存入站的数据,直到准备好了用于处理。

方法名称描述
decode它是用一个ByteBuf调用的,ByteBuf包含传入的字节和一个添加解码消息的列表。重复调用decode(),直到返回时列表为空。然后将列表的内容传递给管道中的下一个处理程序。
decodeLast提供的默认实现只调用decode()。当通道处于非活动状态时,此方法只调用一次。覆盖以提供特殊的。

假如我们接收了一个包含简单整数的字节流,每个都要单独处理,,我们将从入站 ByteBuf 读取每个整数并将其传递给 pipeline 中的下一个ChannelInboundHandler。“解码”字节流成整数我们将扩展ByteToMessageDecoder,实现类为“ToIntegerDecoder”。

在这里插入图片描述

每次从入站的ByteBuf读取四个字节,解码成整型,并添加到一个List,当不能在添加数据到List中时,它所包含的内容就会被发送到下一个ChannelInboudnHandler。

在这里插入图片描述

(1)继承ByteToMessageDecoder实现decode方法

(2)检查可读的字节是否少于4个(int类型是四个字节长度)

(3)从入站ByteBuf读取int,添加到节码消息的List中

尽管ByteToMessageDecoder简化了这个模式,但是在实际操作中(readInt()之前),必须要验证下ByteBuf要有足够的数据。

2、ReplayingDecoder

ReplayingDecoder是ByteToMessageDecoder的一个实现类,读取缓存中数据之前需要先检查下缓存中数据是否有足够字节,使用ReplayingDecoder就无需自己检查,若ByteBuf中有足够的字节,则会正常读取,若没有足够的字节则会停止解码。

正因为ReplayingDecoder是ByteToMessage的包装类,所以它会带有一定的局限性:

  • 不是所有的标准ByteBuf操作都被支持,如果调用一个不支持的操作会抛出UnreplayableOperationException
  • ReplayingDecoder性能慢于ByteToMessageDecoder

如果这些局限性是你可以接受的,那么你可以使用ReplayingDecoder,相反,如果没有引入过多的复杂性,使用ByteToMessageDecoder更优。

在这里插入图片描述

(1)继承ReplayingDecoder用于将字节码转换为消息

(2)从入站的ByteBuf中读取整型,并添加到节码消息的List中

3、MessageToMessageDecoder

用于从一种消息解码成另一种消息(例如:POIO到POJO)

将Integer转换为String,我们自定义IntegerToStringDecoder,继承自MessageToMessageDecoder。

也就是说,入站消息是按照在类定义中声明的参数类型(这里是 Integer) 而不是 ByteBuf来解析的。在之前的例子,解码消息(这里是String)将被添加到List,并传递到下个 ChannelInboundHandler。

在这里插入图片描述

代码实现:

在这里插入图片描述

(1)实现继承自 MessageToMessageDecoder

(2)转换消息为字符串,加到节码队列中

4、解码时太大的帧处理

Netty是异步框架需要缓冲区字节在内存中,直到你能够节码它们。一次,不能让解码器缓存太多的数据以免耗尽可用内存。为了解决这个问题,Netty提供了一个TooLongFrameException,通常由解码器在帧时间过长抛出。

TooLongFrameException 抛出(并由 ChannelHandler.exceptionCaught() 捕获)。然后由译码器的用户决定如何处理它。虽然一些协议,比如 HTTP、允许这种情况下有一个特殊的响应,有些可能没有,事件唯一的选择可能就是关闭连接。ByteToMessageDecoder 可以利用 TooLongFrameException 通知其他 ChannelPipeline 中的 ChannelHandler。

在这里插入图片描述

(1)实现继承 ByteToMessageDecoder 来将字节解码为消息

(2)检测缓冲区数据是否大于 MAX_FRAME_SIZE

(3)忽略所有可读的字节,并抛出 TooLongFrameException 来通知 ChannelPipeline 中的 ChannelHandler 这个帧数据超长

5、Netty中常用的几种解码器

  • LineBasedFrameDecoder
  • DelimiterBaesdFrameDecoder
  • FixedLengthFrameDecoder
  • StringDecoder

(1)LineBasedFrameDecoder

LineBasedFrameDecoder行解码器,遍历ByteBuf中可读字节,按行(\n \r\n)处理。

(2)StringDecoder

StringDecoder将接收的码流转化为字符串

  • 代码中使用

在这里插入图片描述

在这里插入图片描述

(3)DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder,将特定分隔符作为码流结束标志的解码器。

  • 代码中使用

在这里插入图片描述

(4)FixedLengthFrameDecoder

FixedLengthFrameDecoder固定长度节码器,只会读取指定长度的码流。

  • 代码中使用

在这里插入图片描述

5.3.Netty编码器之Encoder

Encoder是用来把出站数据从一种格式转换成另外一种格式,因此它实现了ChannelOutboundHandler。就像Decoder一样,Netty也为你提供了一组类来写Encoder,当然这些提供的是与Decoder完全相反的方法,如下所示:

  • 编码从消息到字节
  • 编码从消息到消息

1、MessageToByteEncoder

这个类只有一个方法,而Decoder却有两个,原因就是Decoder经常需要在Channel关闭时产生一个“最后的消息”。出于这个原因,提供了decodeLast(),而Encoder没有这个需求。

方法名称描述
encodeencode方法是您需要实现的唯一抽象方法。它是通过出站消息调用的,这个类将把出站消息编码为ByteBuf。然后将ByteBuf转发到ChannelPipeline中的下一个ChannelOutboundHandler。

下图实例,我们想生产值,并将他们编码成ByteBuf来发送到线上,我们提供了ShortToByteEncoder来实现该目的。

在这里插入图片描述

上图展示了,Encoder收到了Short消息,进行编码,并把它们写入ByteBuf。ByteBuf接着前面进到下一个pipeline的ChannelOutboundHandler。每个 Short 将占用 ByteBuf 的两个字节。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2ZqO6x78-1667215373979)(images/5.4(2).jpg)]

(1)实现继承自 MessageToByteEncoder

(2)写 Short 到 ByteBuf

Netty 提供很多 MessageToByteEncoder 类来帮助你的实现自己的 encoder 。其中 WebSocket08FrameEncoder 就是个不错的范例。

2、MessageToMessageEncoder

我们已经知道了如何将入站数据从一个消息格式解码成另一个格式。现在我们需要一种方法来将出站数据从一种消息编码成另一种消息。MessageToMessageEncoder 提供此功能,同样的只有一个方法,因为不需要产生“最后的消息”。

下面例子,我们将要解码 Integer 消息到 String 消息。可简单使用 MessageToMessageEncoder。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7DHocuyE-1667215373980)(images/5.4(3).jpg)]

encoder 从出站字节流提取 Integer,以 String 形式传递给ChannelPipeline 中的下一个 ChannelOutboundHandler 。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-C641NNQq-1667215373980)(images/5.4(4).jpg)]

(1)实现继承自 MessageToMessageEncoder

(2)转 Integer 为 String,并添加到 MessageBuf

5.4.Netty编解码器之Codec

我们在讨论解码器和编码器的时候,都是把它们当成不同的实体的,但是有时候如果在同一个类中同时放入入站和出站的数据和信息转换的话,发现会更加实用。而Netty中的抽象Codec(变解码器)类就能达到这个目的,它们成对的组合解码器和编码器,以此提供对于字节和消息都相同的操作(这些类实现了ChannelInboundHandler和ChannelOutboundHandler)。

1、ByteToMessageCodec

我们需要解码字节到消息,也许是一个POJO,然后转回来,ByteToMessageCodec将为我们处理这个问题,因为他结合了ByteToMessageDecoder和MessageToByteEncoder。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qHTzcVX4-1667215373980)(images/5.5(1).jpg)]

类的继承图中我们可以看出,ByteToMessageCodec继承自ChannelDuplexHandler,ChannelDuplexHandler继承自ChannelInboundHandlerAdapter,实现于ChannelOutboundHandler接口,前面我们知道ByteToMessageDecoder继承ChannelInboundHandlerAdapter,MessageToByteEncoder继承自ChannelOutboundHandlerAdapter。所以ByteToMessageCodec兼顾编码、解码的功能。

2、MessageToMessageCodec

和ByteToMessageCodec一样。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gapS6SNb-1667215373981)(images/5.5(2).jpg)]

3、编解码器的优缺点

  • 优点:成对出现,编解码都是在一个类里面完成
  • 缺点:耦合在一起,扩展性不佳

6.Netty网络传输TCP粘包拆包

6.1.TCP粘包拆包讲解

1、TCP粘包、TCP拆包

TCP粘包就是指发送方发送的若干包数据到达接收方时粘成一个包,从接收缓冲区来看,后一包数据的头紧接着前一包数据的尾,出现粘包的原因是多方面的,可能是来自发送方,也可能来自接收方。

在这里插入图片描述

2、出现TCP粘包的原因

(1)发送方原因

TCP默认使用Nagle算法(主要作用:减少网络中报文段的数量),而Nagle算法主要做两件事:

  • 只有上一个分组得到确认,才会发送下一个分组

  • 收集多个小分组,在一个确认到来时一起发送

Nagle算法造成了发送方可能会出现粘包问题

Nagle算法是指发送方发送的数据不会立即发出,而是先放在缓冲区,等待缓冲区满了在发出,发送完一批数据后,会等待接收方对这批数据的回应,然后在发送下一批数据。Nagle算法适用于发送方需要发送大批量数据,并且接收方会及时做出回应的场合,这种算法通过减少传输数据的次数来提高通信效率。

(2)接收方原因

TCP接收到数据包时,并不会马上交到应用层进行处理,或者说应用层并不会立即处理。实际上,TCP将收到的数据包保存在接收缓存里,然后应用程序主动从缓存读取收到的分组。这样一来,如果TCP接收数据包到缓存的速度大于应用程序从缓存中读取数据包的速度,多个包就会被缓存,应用程序就有可能读取到多个首尾相接粘到一起的包。

3、什么时候需要处理粘包现象

如果发送方发送的多组数据本来就是同一块数据的不同部分,比如说一个文件被分成多个部分发送,这时当然不需要处理粘包现象。

如果多个分组毫不相干,甚至是并列关系,那么这个时候就一定要处理粘包现象了。

4、如何处理粘包现象

(1)发送方

对于发送方造成的粘包问题,可以通过关闭Nagle算法来解决,使用TCP_NODELAY选项来关闭。

(2)接收方

接收方没有办法来处理粘包现象,只能将问题交给应用层来处理。

(3)应用层

应用层的解决办法简单可行,不仅能解决接收方的粘包问题,还可以解决发送方的粘包问题。

解决办法:循环处理,应用程序从接收缓存中读取分组时,读完一条数据,就应该循环读取下一条数据,直至所有数据都别处理完成。

如何判断每条数据的长度呢?

格式化数据:每条数据有固定的格式(开始符,结束符),这种方法简单易行,但是选择开始符和结束符时一定要确保每条数据的内部不包含开始符和结束符。

发送长度:发送每条数据时,将数据的长度一并发送,例如规定数据的前4位是数据的长度,应用层在处理时可以根据长度来判断每个分组的开始和结束位置。

5、UDP不会产生粘包问题

TCP为例保证可靠性传输并减少额外的开销(每次发包都要验证),采用了基于流的传输,基于流的传输不认为消息是一条一条的,是无保护消息边界的(保护消息边界:指传输协议把数据当做一条独立的消息在网上传输,接收端一次只能接受一条独立的消息)。

UDP则是面向消息传输的,是有保护消息边界的,接收方一次只接受一条独立的信息,所以不存在粘包问题。
UDP不存在粘包问题,是由于UDP发送的时候,没有经过Negal算法优化,不会将多个小包合并一次发送出去。另外,在UDP协议的接收端,采用了链式结构来记录每一个到达的UDP包,这样接收端应用程序一次recv只能从socket接收缓冲区中读出一个数据包。也就是说,发送端send了几次,接收端必须recv几次(无论recv时指定了多大的缓冲区)。

举个例子:有三个数据包,大小分别为2k、4k、6k,如果采用UDP发送的话,不管接受方的接收缓存有多大,我们必须要进行至少三次以上的发送才能把数据包发送完,但是使用TCP协议发送的话,我们只需要接受方的接收缓存有12k的大小,就可以一次把这3个数据包全部发送完毕。

6、TCP拆包

TCP拆包就是一个完整的包可能会被TCP拆分为多个包进行发送。

在这里插入图片描述

发生拆包的原因:

  • 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。

  • 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。

TCP拆包同样可以通过添加边界信息或者数据报长度信息来解决。

6.2.半包读写常见解决方案

  • 发送方:关闭Nagle算法

  • 接受方:TCP是无界的数据流,并没有处理粘包现象的机制,且协议本身无法避免粘包,可以在应用层处理。

  • 应用层:

    • 设置定长消息(24个字符)
    • 设置消息的边界($_切割)
    • 使用带消息头的协议,消息头存储消息开始标识及消息的长度信息(Header+Body)

6.3.Netty自带解决TCP半包读写方案

  • DelimiterBasedFrameDecoder:指定消息分隔符的解码器

  • LineBaseFrameDecoder:以换行符为结束标志的解码器

  • FixedLengthFrameDecoder:固定长度解码器

  • LengthFieldBasedFrameDecoder:message=header+body,基于长度解码的通用解码器

6.4.半包读写问题案例

(1)EchoServer编写

//创建启动引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //加入服务端线程组
            serverBootstrap.group(bossGroup,workGroup)
                    //设置管道
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //加入处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //加入处理器ServerHandler
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });

            System.out.println("Echo服务启动中...");

(2)ServerHandler处理器编写

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        String body = new String(bytes,"UTF-8").substring(0,bytes.length - System.getProperty("line.separator").length());

        System.out.println("服务端收到消息内容为:"+body+",收到消息次数:"+ ++counter);

    }

(3)测试

在这里插入图片描述

在这里插入图片描述

6.4.空格解码器案例

LineBasedFrameDecoder

(1)EchoServer编写

//加入服务端线程组
            serverBootstrap.group(bossGroup,workGroup)
                    //设置管道
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //加入处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //1024参数为,当没有截取到换行符时,但是字节已经超过1024个,就会抛异常TooLongFrameException
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            //String解码器,InboundHandler接收到的消息能只直接转换成String类型
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });

            System.out.println("Echo服务启动中...");

(2)ServerHandler处理器

	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        String body = (String) msg;

        System.out.println("服务端收到消息内容为:"+body+",收到消息次数:"+ ++counter);

    }

(3)客户端都一样,测试

在这里插入图片描述

6.5.自定义解码器案例

DelimiterBasedFrameDecoder

(1)EchoClient编写

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String message = "11111111111111111&_222222222222222222222&_33333333333333333333&_444444444444444444444444&_";

        ByteBuf msg = null;
        msg = Unpooled.buffer(message.getBytes().length);
        msg.writeBytes(message.getBytes());
        ctx.writeAndFlush(msg);

    }

(2)ServerHandler编写

//加入服务端线程组
            serverBootstrap.group(bossGroup,workGroup)
                    //设置管道
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //加入处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                            //指定分隔符为"&_"
                            ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes());
                            //构建DelimiterBasedFrameDecoder处理器
                            //1024参数为,当没有截取到换行符时,但是字节已经超过1024个,就会抛异常TooLongFrameException
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });

            System.out.println("Echo服务启动中...");

(3)测试

在这里插入图片描述

在这里插入图片描述

5.Netty数据传输编解码

5.1.什么是编码、解码

  • 高性能RPC框架的三个要素:IO模型、数据协议、线程模型
  • 最开始接触的编码:Java序列化/反序列化、URL编码、base64编解码
  • java自带序列化的缺点:
    • 无法跨语言
    • 序列化后的码流太大,也就是数据报太大
    • 序列化和反序列化性能比较差
  • 业界里面也有其他编码框架:
    • ProtoBuf(PB):ProtoBuf是google的一个结构数据序列化方法框架,可简单类比XML,语言无关、平台无关,支持java、c、python等多种语言,高效,比XML更小,扩展性、兼容性好。
    • Trift:Facebook下的一款编解码框架,thrift可以支持多种程序语言,在多种不同的语言之间通信thrift可以作为二进制的高性能的通讯中间件,支持数据(对象)序列化和多种类型的RPC服务。Thrift适用于程序对程序静态的数据交换,需要先确定好他的数据结构,他是完全静态化的,当数据结构发生变化时,必须重新编辑IDL文件。
  • Netty里面的编解码
    • 解码器:负责处理“入站 InboundHandler”数据
    • 编码器:负责处理“出站 OutboundHandler”数据
    • Netty里面提供默认的编解码器,也支持自定义编解码器
      • Encoder:编码器
      • Decoder:解码器
      • Codec:编解码器

5.2.Netty解码器之Decoder

Netty提供了丰富的节码器抽象基类,我们可以很容易的实现这些基类来实现自定义的解码器。

  • 解码字节到消息:ByteToMessageDecoder和ReplayingDecoder
  • 解码消息到消息:MessageToMessageDecoder

decoder负责将“入站”数据从一种格式转换成另一种格式,Netty的节码是一种ChannelInboundHandler的抽象实现。实践中使用解码器很简单,就是将入站数据转换格式后传递到ChannelPipeline中的下一个ChannelInboundHandler进行处理,这样的处理是很灵活的,我们可以将解码器放在ChannelPipeline中,重用逻辑。

1、ByteToMessageDecoder

ByteToMessageDecoder是用于将字节转为消息(或其他字节序列)

你不能确定远端是否会一次发送完一个完整的“消息”,因此这个类会缓存入站的数据,直到准备好了用于处理。

方法名称描述
decode它是用一个ByteBuf调用的,ByteBuf包含传入的字节和一个添加解码消息的列表。重复调用decode(),直到返回时列表为空。然后将列表的内容传递给管道中的下一个处理程序。
decodeLast提供的默认实现只调用decode()。当通道处于非活动状态时,此方法只调用一次。覆盖以提供特殊的。

假如我们接收了一个包含简单整数的字节流,每个都要单独处理,,我们将从入站 ByteBuf 读取每个整数并将其传递给 pipeline 中的下一个ChannelInboundHandler。“解码”字节流成整数我们将扩展ByteToMessageDecoder,实现类为“ToIntegerDecoder”。

在这里插入图片描述

每次从入站的ByteBuf读取四个字节,解码成整型,并添加到一个List,当不能在添加数据到List中时,它所包含的内容就会被发送到下一个ChannelInboudnHandler。

在这里插入图片描述

(1)继承ByteToMessageDecoder实现decode方法

(2)检查可读的字节是否少于4个(int类型是四个字节长度)

(3)从入站ByteBuf读取int,添加到节码消息的List中

尽管ByteToMessageDecoder简化了这个模式,但是在实际操作中(readInt()之前),必须要验证下ByteBuf要有足够的数据。

2、ReplayingDecoder

ReplayingDecoder是ByteToMessageDecoder的一个实现类,读取缓存中数据之前需要先检查下缓存中数据是否有足够字节,使用ReplayingDecoder就无需自己检查,若ByteBuf中有足够的字节,则会正常读取,若没有足够的字节则会停止解码。

正因为ReplayingDecoder是ByteToMessage的包装类,所以它会带有一定的局限性:

  • 不是所有的标准ByteBuf操作都被支持,如果调用一个不支持的操作会抛出UnreplayableOperationException
  • ReplayingDecoder性能慢于ByteToMessageDecoder

如果这些局限性是你可以接受的,那么你可以使用ReplayingDecoder,相反,如果没有引入过多的复杂性,使用ByteToMessageDecoder更优。

在这里插入图片描述

(1)继承ReplayingDecoder用于将字节码转换为消息

(2)从入站的ByteBuf中读取整型,并添加到节码消息的List中

3、MessageToMessageDecoder

用于从一种消息解码成另一种消息(例如:POIO到POJO)

将Integer转换为String,我们自定义IntegerToStringDecoder,继承自MessageToMessageDecoder。

也就是说,入站消息是按照在类定义中声明的参数类型(这里是 Integer) 而不是 ByteBuf来解析的。在之前的例子,解码消息(这里是String)将被添加到List,并传递到下个 ChannelInboundHandler。

在这里插入图片描述

代码实现:

在这里插入图片描述

(1)实现继承自 MessageToMessageDecoder

(2)转换消息为字符串,加到节码队列中

4、解码时太大的帧处理

Netty是异步框架需要缓冲区字节在内存中,直到你能够节码它们。一次,不能让解码器缓存太多的数据以免耗尽可用内存。为了解决这个问题,Netty提供了一个TooLongFrameException,通常由解码器在帧时间过长抛出。

TooLongFrameException 抛出(并由 ChannelHandler.exceptionCaught() 捕获)。然后由译码器的用户决定如何处理它。虽然一些协议,比如 HTTP、允许这种情况下有一个特殊的响应,有些可能没有,事件唯一的选择可能就是关闭连接。ByteToMessageDecoder 可以利用 TooLongFrameException 通知其他 ChannelPipeline 中的 ChannelHandler。

在这里插入图片描述

(1)实现继承 ByteToMessageDecoder 来将字节解码为消息

(2)检测缓冲区数据是否大于 MAX_FRAME_SIZE

(3)忽略所有可读的字节,并抛出 TooLongFrameException 来通知 ChannelPipeline 中的 ChannelHandler 这个帧数据超长

5、Netty中常用的几种解码器

  • LineBasedFrameDecoder
  • DelimiterBaesdFrameDecoder
  • FixedLengthFrameDecoder
  • StringDecoder

(1)LineBasedFrameDecoder

LineBasedFrameDecoder行解码器,遍历ByteBuf中可读字节,按行(\n \r\n)处理。

(2)StringDecoder

StringDecoder将接收的码流转化为字符串

  • 代码中使用

在这里插入图片描述

在这里插入图片描述

(3)DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder,将特定分隔符作为码流结束标志的解码器。

  • 代码中使用

在这里插入图片描述

(4)FixedLengthFrameDecoder

FixedLengthFrameDecoder固定长度节码器,只会读取指定长度的码流。

  • 代码中使用

在这里插入图片描述

5.3.Netty编码器之Encoder

Encoder是用来把出站数据从一种格式转换成另外一种格式,因此它实现了ChannelOutboundHandler。就像Decoder一样,Netty也为你提供了一组类来写Encoder,当然这些提供的是与Decoder完全相反的方法,如下所示:

  • 编码从消息到字节
  • 编码从消息到消息

1、MessageToByteEncoder

这个类只有一个方法,而Decoder却有两个,原因就是Decoder经常需要在Channel关闭时产生一个“最后的消息”。出于这个原因,提供了decodeLast(),而Encoder没有这个需求。

方法名称描述
encodeencode方法是您需要实现的唯一抽象方法。它是通过出站消息调用的,这个类将把出站消息编码为ByteBuf。然后将ByteBuf转发到ChannelPipeline中的下一个ChannelOutboundHandler。

下图实例,我们想生产值,并将他们编码成ByteBuf来发送到线上,我们提供了ShortToByteEncoder来实现该目的。

在这里插入图片描述

上图展示了,Encoder收到了Short消息,进行编码,并把它们写入ByteBuf。ByteBuf接着前面进到下一个pipeline的ChannelOutboundHandler。每个 Short 将占用 ByteBuf 的两个字节。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2ZqO6x78-1667215373979)(images/5.4(2).jpg)]

(1)实现继承自 MessageToByteEncoder

(2)写 Short 到 ByteBuf

Netty 提供很多 MessageToByteEncoder 类来帮助你的实现自己的 encoder 。其中 WebSocket08FrameEncoder 就是个不错的范例。

2、MessageToMessageEncoder

我们已经知道了如何将入站数据从一个消息格式解码成另一个格式。现在我们需要一种方法来将出站数据从一种消息编码成另一种消息。MessageToMessageEncoder 提供此功能,同样的只有一个方法,因为不需要产生“最后的消息”。

下面例子,我们将要解码 Integer 消息到 String 消息。可简单使用 MessageToMessageEncoder。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7DHocuyE-1667215373980)(images/5.4(3).jpg)]

encoder 从出站字节流提取 Integer,以 String 形式传递给ChannelPipeline 中的下一个 ChannelOutboundHandler 。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-C641NNQq-1667215373980)(images/5.4(4).jpg)]

(1)实现继承自 MessageToMessageEncoder

(2)转 Integer 为 String,并添加到 MessageBuf

5.4.Netty编解码器之Codec

我们在讨论解码器和编码器的时候,都是把它们当成不同的实体的,但是有时候如果在同一个类中同时放入入站和出站的数据和信息转换的话,发现会更加实用。而Netty中的抽象Codec(变解码器)类就能达到这个目的,它们成对的组合解码器和编码器,以此提供对于字节和消息都相同的操作(这些类实现了ChannelInboundHandler和ChannelOutboundHandler)。

1、ByteToMessageCodec

我们需要解码字节到消息,也许是一个POJO,然后转回来,ByteToMessageCodec将为我们处理这个问题,因为他结合了ByteToMessageDecoder和MessageToByteEncoder。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qHTzcVX4-1667215373980)(images/5.5(1).jpg)]

类的继承图中我们可以看出,ByteToMessageCodec继承自ChannelDuplexHandler,ChannelDuplexHandler继承自ChannelInboundHandlerAdapter,实现于ChannelOutboundHandler接口,前面我们知道ByteToMessageDecoder继承ChannelInboundHandlerAdapter,MessageToByteEncoder继承自ChannelOutboundHandlerAdapter。所以ByteToMessageCodec兼顾编码、解码的功能。

2、MessageToMessageCodec

和ByteToMessageCodec一样。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gapS6SNb-1667215373981)(images/5.5(2).jpg)]

3、编解码器的优缺点

  • 优点:成对出现,编解码都是在一个类里面完成
  • 缺点:耦合在一起,扩展性不佳

6.Netty网络传输TCP粘包拆包

6.1.TCP粘包拆包讲解

1、TCP粘包、TCP拆包

TCP粘包就是指发送方发送的若干包数据到达接收方时粘成一个包,从接收缓冲区来看,后一包数据的头紧接着前一包数据的尾,出现粘包的原因是多方面的,可能是来自发送方,也可能来自接收方。

在这里插入图片描述

2、出现TCP粘包的原因

(1)发送方原因

TCP默认使用Nagle算法(主要作用:减少网络中报文段的数量),而Nagle算法主要做两件事:

  • 只有上一个分组得到确认,才会发送下一个分组

  • 收集多个小分组,在一个确认到来时一起发送

Nagle算法造成了发送方可能会出现粘包问题

Nagle算法是指发送方发送的数据不会立即发出,而是先放在缓冲区,等待缓冲区满了在发出,发送完一批数据后,会等待接收方对这批数据的回应,然后在发送下一批数据。Nagle算法适用于发送方需要发送大批量数据,并且接收方会及时做出回应的场合,这种算法通过减少传输数据的次数来提高通信效率。

(2)接收方原因

TCP接收到数据包时,并不会马上交到应用层进行处理,或者说应用层并不会立即处理。实际上,TCP将收到的数据包保存在接收缓存里,然后应用程序主动从缓存读取收到的分组。这样一来,如果TCP接收数据包到缓存的速度大于应用程序从缓存中读取数据包的速度,多个包就会被缓存,应用程序就有可能读取到多个首尾相接粘到一起的包。

3、什么时候需要处理粘包现象

如果发送方发送的多组数据本来就是同一块数据的不同部分,比如说一个文件被分成多个部分发送,这时当然不需要处理粘包现象。

如果多个分组毫不相干,甚至是并列关系,那么这个时候就一定要处理粘包现象了。

4、如何处理粘包现象

(1)发送方

对于发送方造成的粘包问题,可以通过关闭Nagle算法来解决,使用TCP_NODELAY选项来关闭。

(2)接收方

接收方没有办法来处理粘包现象,只能将问题交给应用层来处理。

(3)应用层

应用层的解决办法简单可行,不仅能解决接收方的粘包问题,还可以解决发送方的粘包问题。

解决办法:循环处理,应用程序从接收缓存中读取分组时,读完一条数据,就应该循环读取下一条数据,直至所有数据都别处理完成。

如何判断每条数据的长度呢?

格式化数据:每条数据有固定的格式(开始符,结束符),这种方法简单易行,但是选择开始符和结束符时一定要确保每条数据的内部不包含开始符和结束符。

发送长度:发送每条数据时,将数据的长度一并发送,例如规定数据的前4位是数据的长度,应用层在处理时可以根据长度来判断每个分组的开始和结束位置。

5、UDP不会产生粘包问题

TCP为例保证可靠性传输并减少额外的开销(每次发包都要验证),采用了基于流的传输,基于流的传输不认为消息是一条一条的,是无保护消息边界的(保护消息边界:指传输协议把数据当做一条独立的消息在网上传输,接收端一次只能接受一条独立的消息)。

UDP则是面向消息传输的,是有保护消息边界的,接收方一次只接受一条独立的信息,所以不存在粘包问题。
UDP不存在粘包问题,是由于UDP发送的时候,没有经过Negal算法优化,不会将多个小包合并一次发送出去。另外,在UDP协议的接收端,采用了链式结构来记录每一个到达的UDP包,这样接收端应用程序一次recv只能从socket接收缓冲区中读出一个数据包。也就是说,发送端send了几次,接收端必须recv几次(无论recv时指定了多大的缓冲区)。

举个例子:有三个数据包,大小分别为2k、4k、6k,如果采用UDP发送的话,不管接受方的接收缓存有多大,我们必须要进行至少三次以上的发送才能把数据包发送完,但是使用TCP协议发送的话,我们只需要接受方的接收缓存有12k的大小,就可以一次把这3个数据包全部发送完毕。

6、TCP拆包

TCP拆包就是一个完整的包可能会被TCP拆分为多个包进行发送。

在这里插入图片描述

发生拆包的原因:

  • 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。

  • 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。

TCP拆包同样可以通过添加边界信息或者数据报长度信息来解决。

6.2.半包读写常见解决方案

  • 发送方:关闭Nagle算法

  • 接受方:TCP是无界的数据流,并没有处理粘包现象的机制,且协议本身无法避免粘包,可以在应用层处理。

  • 应用层:

    • 设置定长消息(24个字符)
    • 设置消息的边界($_切割)
    • 使用带消息头的协议,消息头存储消息开始标识及消息的长度信息(Header+Body)

6.3.Netty自带解决TCP半包读写方案

  • DelimiterBasedFrameDecoder:指定消息分隔符的解码器

  • LineBaseFrameDecoder:以换行符为结束标志的解码器

  • FixedLengthFrameDecoder:固定长度解码器

  • LengthFieldBasedFrameDecoder:message=header+body,基于长度解码的通用解码器

6.4.半包读写问题案例

(1)EchoServer编写

//创建启动引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //加入服务端线程组
            serverBootstrap.group(bossGroup,workGroup)
                    //设置管道
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //加入处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //加入处理器ServerHandler
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });

            System.out.println("Echo服务启动中...");

(2)ServerHandler处理器编写

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        String body = new String(bytes,"UTF-8").substring(0,bytes.length - System.getProperty("line.separator").length());

        System.out.println("服务端收到消息内容为:"+body+",收到消息次数:"+ ++counter);

    }

(3)测试

在这里插入图片描述

在这里插入图片描述

6.4.空格解码器案例

LineBasedFrameDecoder

(1)EchoServer编写

//加入服务端线程组
            serverBootstrap.group(bossGroup,workGroup)
                    //设置管道
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //加入处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //1024参数为,当没有截取到换行符时,但是字节已经超过1024个,就会抛异常TooLongFrameException
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            //String解码器,InboundHandler接收到的消息能只直接转换成String类型
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });

            System.out.println("Echo服务启动中...");

(2)ServerHandler处理器

	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        String body = (String) msg;

        System.out.println("服务端收到消息内容为:"+body+",收到消息次数:"+ ++counter);

    }

(3)客户端都一样,测试

在这里插入图片描述

6.5.自定义解码器案例

DelimiterBasedFrameDecoder

(1)EchoClient编写

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String message = "11111111111111111&_222222222222222222222&_33333333333333333333&_444444444444444444444444&_";

        ByteBuf msg = null;
        msg = Unpooled.buffer(message.getBytes().length);
        msg.writeBytes(message.getBytes());
        ctx.writeAndFlush(msg);

    }

(2)ServerHandler编写

//加入服务端线程组
            serverBootstrap.group(bossGroup,workGroup)
                    //设置管道
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //加入处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                            //指定分隔符为"&_"
                            ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes());
                            //构建DelimiterBasedFrameDecoder处理器
                            //1024参数为,当没有截取到换行符时,但是字节已经超过1024个,就会抛异常TooLongFrameException
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });

            System.out.println("Echo服务启动中...");

(3)测试

在这里插入图片描述

在这里插入图片描述

8.Netty搭建单机百万连接

8.1.Netty单机百万连接方案

实现单机的百万连接,瓶颈有以下几点:

(1)如何模拟百万连接

(2)突破局部文件句柄的限制

(3)突破全局文件句柄的限制

在Linux系统中,单个进程打开的句柄数是非常有限的,一条TCP连接就对应一个文件句柄,而对于我们应用程序来说,一个服务端默认建立的连接数是有限制的。

如下图所示,通常一个客户端去除一些被占用的端口之后,可用的端口大于只有6w个左右,要想模拟百万连接要比较多的客户端,而且比较麻烦,所以比较麻烦,所以这种方案不适合。

在这里插入图片描述

在服务端启动800~8100,而客户端依旧使用1025-65535范围内可用的端口号,让同一个端口号,可以连接Server的不同端口。这样的话,6W的端口可以连接Server的100个端口,累加起来就能实现近600W左右的连接,TCP是以一个四元组概念,以原IP、原端口号、目的IP、目的端口号来确定的,当原IP 和原端口号相同,但目的端口号不同,最终系统会把他当成两条TCP 连接来处理,所以TCP连接可以如此设计。

在这里插入图片描述

8.2.Netty搭建百万连接案例

1、NettyServer服务端代码

public class NettyServer {

    public void run(int beginPort, int endPort) {
        System.out.println("服务端启动中。。");
        //配置服务端线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                //.childOption(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_REUSEADDR, true); //快速复用端口

        serverBootstrap.childHandler(new TCPCountHandler());

        for (; beginPort < endPort; beginPort++) {
            int port = beginPort;
            serverBootstrap.bind(port).addListener((ChannelFutureListener) future -> {
                System.out.println("服务端成功绑定端口 port = " + port);
            });
        }

    }

    /**
     * 启动入口
     *
     * @param args
     */
    public static void main(String[] args) {
        new NettyServer().run(NettyConfig.BEGIN_PORT, NettyConfig.END_PORT);
    }

}

2、TCPCountHandler代码编写

@ChannelHandler.Sharable
public class TCPCountHandler extends ChannelInboundHandlerAdapter {

    //使用原子类,避免线程安全问题
    private AtomicInteger atomicInteger = new AtomicInteger();

    public TCPCountHandler(){
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(()->{
            System.out.println("当前连接数为 = "+atomicInteger.get());
        },0,3, TimeUnit.SECONDS);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        atomicInteger.incrementAndGet();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       atomicInteger.decrementAndGet();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("TCPCountHandler exceptionCaught");
        cause.printStackTrace();
        ctx.close();
    }
}

3、NettyConfig配置类

public class NettyConfig {

    public static int BEGIN_PORT = 8000;

    public static int END_PORT = 8050;

    public static String SERVER_ADDR = "127.0.0.1";

}

4、NettyClient客户端代码

public class NettyClient {

    public void run(int beginPort,int endPort){
        System.out.println("客户端启动中。。");

        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();

        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR,true) //快速复用端口
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {

                    }
                });

        int index = 0;

        while(true){
            int finalPort = beginPort + index;
            try {
                bootstrap.connect(NettyConfig.SERVER_ADDR,finalPort).addListener((ChannelFutureListener) future ->{
                    if (!future.isSuccess()){
                        System.out.println("创建连接失败 port = "+finalPort);
                    }
                }).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            ++index;
            if(index == (endPort - beginPort)){
                index = 0;
            }
        }

    }

    /**
     * 启动入口
     * @param args
     */
    public static void main(String[] args) {
        new NettyClient().run(NettyConfig.BEGIN_PORT,NettyConfig.END_PORT);
    }
}

5、maven打包依赖加入pom.xml中

分两次打包,先打包server,在打包client,打包哪个主类的时候,把另一个先注掉。注意这块,打包之前先把NettyConfig中的地址改掉,改成Netty-server的地址。

在这里插入图片描述

<build>
        <plugins>
            <!--maven的默认编译使用的jdk版本貌似很低,使用maven-compiler-plugin插件可以指定项目源码的jdk版本-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>

            <!--将依赖的jar包打包到当前jar包,常规打包是不会将所依赖jar包打进来的-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 打包server主类 -->
                                    <mainClass>com.lixiang.NettyServer</mainClass> 
                                    <!-- 打包client主类 -->
                                    <!--<mainClass>com.lixiang.NettyClient</mainClass>-->
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

在这里插入图片描述

8.3.Netty百万连接测试

1、环境准备

两台机器:192.168.159.60(netty-server)、192.168.159.61(netty-client)

虚拟机:centos7系统 4核8G(注意:这块系统参数至少要4核6G)

192.168.159.60(netty-server)放置NettyServer主类jar包

192.168.159.61(netty-client)放置NettyClient主类jar包

先启动server端的jar包,在启动client端的jar包,启动命令:java -jar million-server-1.0-SNAPSHOT.jar

在这里插入图片描述

我们可以看到当前的连接数一直在4000上不去。出现异常 Caused by: java.io.IOException: Too many open files

too many open files:顾名思义即打开过多文件数。不过这里的files不单是文件的意思,也包括打开的通讯链接(比如socket),正在监听的端口等等,所以有时候也可以叫做句柄(handle),这个错误通常也可以叫做句柄数超出系统限制。Linux是有文件句柄限制的,而且默认不是很高,一般都是1024。查看当前用户句柄数限制:

ulimit -n

在这里插入图片描述

我们可以看到当前的文件句柄数是1024,我们的机器是4核的所以大概的连接数在4000左右,那么如何提高文件句柄数呢?

2、修改文件句柄数,让netty-server支持百万连接

(1)root身份下编解/etc/security/limits.conf

 vi /etc/security/limits.conf
 
增加如下:
root soft nofile 1000000
root hard nofile 1000000
* soft nofile 1000000
* hard nofile 1000000

(2)修改全局文件句柄限制(所有进程最大打开的文件数,不同系统是不一样,可以直接echo临时修改)

查看命令:cat /proc/sys/fs/file-max

在这里插入图片描述

(3)永久修改全局文件句柄, 修改后生效 sysctl -p

vi /etc/sysctl.conf

增加 fs.file-max = 1000000

使其生效:sysctl -p

在这里插入图片描述

(4)修改完成后重启机器,client端配置也是一样的

reboot

(5)启动运行jar包

java -jar million-server-1.0-SNAPSHOT.jar  -Xms5g -Xmx5g -XX:NewSize=3g -XX:MaxNewSize=3g

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty 单机百万连接测试 的相关文章

  • 2022-08-14 SSH 相关命令详解

    SSH 相关命令详解 sshssh keygenssh copy idssh agent 和 ssh addssh keyscansshd ssh ssh OpenSSH 远端登陆客户端 xff0c 默认22端口 描述 xff1a span
  • 浅谈Centos用户权限管理

    一 用户与组的概念 1 xff0e 理解linux多用户 xff0c 多任务的特性 Linux是一个真实的 完整的多用户多任务操作系统 xff0c 多用户多任务就是可以在系统上建立多个用户 xff0c 而多个用户可以在同一时间内登录同一个系
  • Linux centos升级nodejs,解决升级NodeJS遇到的问题,升级GLIBC、GLIBCXX、gcc(含资源包下载)

    公司网站用的Nuxt开发的 xff0c 本地开发环境NodeJS已经升级到16 14 2版本 xff0c 服务器也要从12版本升级到16 14 2 如需本次安装的资源 xff0c 请下滑到文章下面下载整套资源 NodeJS版本下载地址 xf
  • 关于UEFI引导的理解

    UEFI 和 Legacy区别 UEFT和Legacy是引导模式 xff0c 是用来引导系统的 按下开机键到看到windows标识 Legacy 传统BIOS模式 xff0c 启动顺序 xff1a 开机 gt BIOS初始化 gt BIOS
  • IDEA license server 地址

    旧地址 xff1a http jetbrains license server 新地址 xff1a http fls jetbrains agent com
  • 线性探测再散列

    哈希表又称散列表 哈希表存储的基本思想是 xff1a 以数据表中的每个记录的关键字 k为自变量 xff0c 通过一种函数H k 计算出函数值 把这个值解释为一块连续存储空间 xff08 即数组空间 xff09 的单元地址 xff08 即下标
  • 特征选择的几种方法

    目录 1 过滤法 xff08 Filter xff09 1 1 方差选择法 1 2 相关系数法 1 3 卡方检验 1 4 互信息法 1 5 relief算法 2 包裹法 xff08 Wrapper xff09 2 1 递归特征消除法 2 2
  • Excel调用有道词典实现批量翻译

    如图所示 xff0c 我们在B2单元格中写入公式 xff1a 61 FILTERXML WEBSERVICE 34 http fanyi youdao com translate amp i 61 34 amp A2 amp 34 amp
  • Python的使用技巧:any all的短路

    注意迭代类型和list的结果是不一样的 xff1a if name 61 61 39 main 39 a 61 1 2 3 if any print i is None for i in a print 6666666666 1 2 3 6
  • curl升级到7.87(centos7和TencentOS2.4 tk)

    centos7升级curl到7 8 7 按照之前写过的一篇文章 大致按描述操作即可 只不过需要做一点点修正 CentOS 7升级curl 乐大师的博客 CSDN博客 centos7 curl升级 更新操作中会报错安装失败 提示如下 nbsp
  • Python中raise…from用法

    本来这几天是计划阅读string模块的源码 xff0c 恰好其中一段异常处理的代码我觉得很新奇 xff0c 也就是raise from的用法 xff0c raise的用法大家都知道 因为我之前没遇到过 xff0c 所以就去网上查了相关的资料
  • AI模型隐私风险及防护技术

    一 背景 随着AI成为新一代关键技术趋势 xff0c 围绕着AI的服务也越来越普及 特别是结合了云计算以后 xff0c 机器学习数据的标注 模型训练及预测等服务纷纷上云 xff0c 为用户提供了强大的算力和优秀的算法 xff0c 极大方便了
  • 汉诺塔的图解递归算法

    一 xff0e 起源 xff1a 汉诺塔 xff08 又称河内塔 xff09 问题是源于印度一个古老传说的益智玩具 大梵天创造世界的时候做了三根金刚石柱子 xff0c 在一根柱子上从下往上按照大小顺序摞着64片黄金圆盘 大梵天命令婆罗门把圆
  • 推荐系统中的矩阵分解总结

    最近学习矩阵分解 xff0c 但是学了好多种类 xff0c 都乱了 xff0c 看了这篇文章 xff0c 系统性的总结了矩阵分解 xff0c 感觉很棒 xff0c 故分享如下 前言 推荐系统中最为主流与经典的技术之一是协同过滤技术 xff0
  • 几种常见的离群点检验方法

    在一组平行测定中 xff0c 若有个别数据与平均值差别较大 xff0c 则把此数据视为可疑值 xff0c 也称离群值 如果统计学上认为应该舍弃的数据留用了 xff0c 势必会影响其平均值的可靠性 相反 xff0c 本应该留用的数 据被舍弃
  • Spring框架介绍及使用(一)

    文章目录 概念为什么要用 xff1f Spring的体系结构Spring框架之控制反转 xff08 IOC xff09 概念Spring文件包解释入门程序入门程序需要的jar包配置文件入门程序的建立ApplicationContext与Be
  • SpringMVC 相关配置

    SpringMVC 相关配置 打印请求与响应日志 打印 64 RequestBody 64 Response日志 https blog csdn net ww 1997 article details 116006445 https www
  • 普通表到分区表转换

    A 通过 Export import 方法 B 通过 Insert with a subquery 方法 C 通过 Partition Exchange 方法 D 通过 DBMS REDEFINITION 方法 比如把test用户下的普通表
  • Ubuntu 20.04 上安装 Node.js 和 npm 的三种方法

    主要介绍三种在 Ubuntu 20 04 上安装 Node js 和 npm 的方法 xff1a 通过Ubuntu标准软件库 这是最简单的安装方法 xff0c 并且适用大多数场景 但是标准软件库中最高版本只有 v10 19 0 root 6
  • android databinding 数据绑定错误 错误:任务':app:compileDebugJavaWithJavac' 的执行失败

    今天到公司照常打开项目 xff0c 突然运行不了显示databinding错误 Error Execution failed for task 39 app compileDebugJavaWithJavac 39 gt android d

随机推荐

  • 解决idea新建Module的奇怪路径问题

    问题由来 xff1a 在部署SpringCloud的时候想新建一个module来快速创建 xff0c 结果被创建出来的目录结构搞得一脸懵逼 xff0c 新建的module的根目录跑到了 xff0c 项目的src目录下 xff0c 整个看起来
  • ThingsBoard源码解析-数据订阅与规则链数据处理

    前言 结合本篇对规则链的执行过程进行探讨 根据之前对MQTT源码的学习 xff0c 我们由消息的处理入手 org thingsboard server transport mqtt MqttTransportHandler void pro
  • Thingsboard使用gateway网关

    简介 xff1a 本次是想测试一下thingsboard网关的使用 xff0c 实现通过网关 43 mqtt 43 thingsboard 43 emqx 实现间接设备创建和数据传输 前期准备 xff1a thingsboard平台 thi
  • Thingsboard(2.4 postgresql版)数据库表结构说明

    本文描述的表结构是根据thingsboard2 4 xff08 postgresql版 xff09 数据库中整理出来的 xff0c 不一定完整 xff0c 后续有新的发现再补充文档 一 数据库E R关系 Thingsboard2 4社区版共
  • ThingsBoard—自定义规则节点

    一般的功能 xff0c 可以使用现有的节点来完成 但如果有比较复杂 xff0c 或有自己特殊业务需求的 xff0c 可能就需要自定义了 按官方教程来基本就可以入门 xff0c 如果需要深入 xff0c 可以参考ThingsBoard自有节点
  • Thingsboard 报错 Cannot resolve symbol ‘TransportProtos‘

    本人idea 版本为 2021 1 xff0c 顺利编译 thingsboard 打开进行源码阅读时 xff0c 发现报 Cannot resolve symbol 39 TransportProtos 39 xff0c 如下图 xff1a
  • ThingsBoard 规则引擎-邮件通知

    之前我们已经学习了Thingsboard安装 设备接入 简单的数据可视化内容 xff0c 今天来继续学习下thingsboard其他特性 规则引擎 应用场景 ThingsBoard规则引擎是一个支持高度可定制复杂事件处理的框架 xff0c
  • ThingsBoard编译报错:Failure to find org.gradle:gradle-tooling-api:jar:6.3

    删除本地仓库未下载完成的缓存文件 xff08 删除像图片显示这样以 lastUpdated结尾的文件 xff09 执行mvn v确保maven命令可以正常执行执行下面命令 xff0c 将下载的jar安装到本地仓库 注意 xff1a 将 Df
  • Thingsboard3.4-OTA升级

    背景 在做设备端对接thingsboard平台得时候 xff0c 去研究设备端对接平台的过程中 xff0c 花了不少时间 xff0c 在此之前也没有找到相关的文档 xff0c 于是出于减少大家去研究的时间 xff0c 写了这篇博客 xff0
  • PyCharm更换pip源为国内源、模块安装、PyCharm依赖包导入导出教程

    一 更换pip为国内源 1 使用PyCharm创建一个工程 2 通过File gt Setting 选择解释器为本工程下的Python解释器 3 单击下图中添加 43 xff0c 4 单击下图中的 Manage Repositories 按
  • Pycharm没有找到manage repositories按钮解决方案

    问题描述 xff1a 不知道是因为版本原因还是其他 xff0c pycharm没有找到manage repositories按钮 xff0c 无法更改下载源 xff0c 导致安装库的速度会很慢 解决办法 xff1a 1 点击左下角的pyth
  • 通过改变JVM参数配置降低内存消耗

    有个项目 xff0c 其服务器端原本内存占用很大 xff0c 16G内存几乎都用光了 原先的JVM参数配置是这样的 xff1a Xms16384m Xmx16384m XX PermSize 61 64m XX MaxPermSize 61
  • NodeJS yarn 或 npm如何切换淘宝或国外镜像源

    一 查看当前的镜像源 npm config get registry 或 yarn config get registry 二 设置为淘宝镜像源 xff08 全局设置 xff09 npm config set registry https
  • Centos7 部署InfluxDB

    因为目前网络上关于InfluxDB的资料并不多 xff0c 所以这里建议多参考官网 官网 xff1a Home InfluxData 点击此处的Docs xff1a 这里选择 InfluxDB OSS xff1a 使用文档时根据需求选择查看
  • SpringBoot 集成 Emqx 发布/订阅数据

    最近项目中用到Emqx发布 订阅数据 xff0c 特此记录便于日后查阅 ThingsboardEmqxTransportApplication Copyright 2016 2023 The Thingsboard Authors lt p
  • Centos7部署Minio集群

    1 地址规划 minio1 span class token number 10 0 span 0 200 minio2 span class token number 10 0 span 0 201 minio3 span class t
  • Centos7 部署单机 Minio 对象存储服务

    MinIO 是一款基于 Go 语言发开的高性能 分布式的对象存储系统 xff0c 客户端支持 Java xff0c Net xff0c Python xff0c Javacript xff0c Golang语言 MinIO 的主要目标是作为
  • Netty源码解读

    Netty源码解读 Netty线程模型 1 定义了两组线程池BossGroup和WorkerGroup xff0c BossGroup专门负责接收客户端的连接 WorkerGroup专门负责网络的读写 2 BossGroup和WorkerG
  • Springboot Netty 实现自定义协议

    Netty是由JBOSS提供的一个java开源框架 xff0c 现为 Github上的独立项目 Netty提供异步的 事件驱动的网络应用程序框架和工具 xff0c 用以快速开发高性能 高可靠性的网络服务器和客户端程序 也就是说 xff0c
  • Netty 单机百万连接测试

    1 Netty框架简介 1 1 Netty简介 netty是jboss提供的一个java开源框架 xff0c netty提供异步的 事件驱动的网络应用程序框架和工具 xff0c 用以快速开发高性能 高可用性的网络服务器和客户端程序 也就是说