Java网络编程

2023-11-10

博客说明:内容初稿为本人的学习笔记归纳整理,在此基础上加入了相关视频学习、相关书籍的理解、相关文章博客查阅及源码阅读。

博客的编写已经尽量做到详尽,但免不了有纰漏和理解不到位的地方。

发现博客的任何问题均可联系我 aboutwxf@163.com。谢谢!

目录

一、网络编程概念

二、IO模型

2.1 Java层面的IO模型

2.2 操作系统层面的IO模型

三、多路复用

3.1 select

3.2 poll

3.3 epoll

3.4 应用

四、系统调用和零拷贝

4.1 系统调用

4.2 零拷贝

五、BIO

5.1 InetAddress

5.2 UDP

基本介绍

实现UDP

通讯方式

5.3 TCP

基本介绍

Socket

实现TCP

文件传输

六、NIO

6.1 基本介绍

6.2 实现原理

6.3 缓冲区

6.4 直接内存

6.5 通道

6.6 选择器

6.7 NIO实现

七、AIO


一、网络编程概念

网络编程,就是在一定的协议下,实现两台计算机之间进行通信的技术。网络编程三要素:协议、IP地址、端口号

  1. 协议:计算机网络客户端与服务端通信必须约定和彼此遵守的通信规则,HTTP、FTP、TCP、UDP、SMTP

  2. IP 地址:互联网协议地址(Internet Protocol Address),用来给一个网络中的计算机设备做唯一的编号。

    • IPv4:4 个字节,32 位组成,192.168.1.1

    • IPv6:可以实现为所有设备分配 IP,128 位

  3. 端口:端口号就可以唯一标识设备中的进程(应用程序)。端口号是用两个字节表示的整数,取值范围是 0-65535,0-1023 之间的端口号用于一些知名的网络服务和应用普通的应用程序需要使用 1024 以上的端口号。如果端口号被另外一个服务或应用所占用,会导致当前程序启动失败,报出端口被占用异常

常见端口号:FTP:21 HTTP:80 HTTPS:443 ORACLE:1521 MYSQL:3306 ZOOKEEPER:2181 KAFKA:9092 REDIS:6379 TOMCAT:8080

利用协议+IP 地址+端口号三元组合,就可以标识网络中的进程了,那么进程间的通信就可以利用这个标识与其它进程进行交互。

网络通信协议:对计算机必须遵守的规则,只有遵守这些规则,计算机之间才能进行通信。通信是进程与进程之间的通信,不是主机与主机之间的通信。TCP / UDP / HTTP / FTP / SFTP 等都是比较重要的网络通信协议

TCP / UDP区别:

TCP传输控制协议(Transmission Control Protocol)是面向连接的,提供可靠交付,有流量控制,拥塞控制,提供全双工通信,面向字节流,每一条 TCP 连接只能是点对点的(一对一)

  • 在通信之前必须确定对方在线并且连接成功才可以通信

  • 例如下载文件、浏览网页等(要求可靠传输)

UDP用户数据报协议(User Datagram Protocol)是无连接的,尽最大可能交付,不可靠,没有拥塞控制,面向报文,支持一对一、一对多、多对一和多对多的交互通信

  • 直接发消息给对方,不管对方是否在线,发消息后也不需要确认

  • 无线(视频会议,通话),性能好,可能丢失一些数据

TCP / UDP / IP / HTTP / socket / RPC 都是啥 ?

HTTP:应用层协议

TCP / UDP :传输层协议

IP:网络层协议

SOCKET:不是一种协议,是为了更方便我们使用 TCP/IP而抽象出来的API

RPC : 远程过程调用。

高速公路(IP):关注的是路怎么修,路标怎么分发维护,关注的是如何形成一个到处都可以往来的大地图

卡车(TCP / UDP):关注的是在送往目的地的过程中,货不能多也不能少,保证安全抵达

货物(HTTP):关注的是 货用箱子还是盒子装,负责在箱子上标注货物重量、目的地之类的信息。

所以网络通信基本就是 TCP UDP一类的传输层协议负责将封装好的http报文通过IP协议完整的发送到指定(协议、IP、端口)进程当中。

而RPC它既不是协议也不是API ,是一种方法,描述的是远程服务之间相互调用的概念,RPC框架做一系列的封装就是为了更好的进行远程访问服务

二、IO模型

2.1 Java层面的IO模型

相关概念:

  • 同步:当前线程要自己进行数据的读写操作(自己去银行取钱)

  • 异步:当前线程可以去做其他事情(委托别人拿银行卡到银行取钱,然后给你)

  • 阻塞:在数据没有的情况下,还是要继续等待着读(排队等待)

  • 非阻塞:在数据没有的情况下,会去做其他事情,一旦有了数据再来获取(柜台取款,取个号,然后坐在椅子上做其它事,等号广播会通知你办理)

同步阻塞:一个线程执行一个客户端请求,在此期间无法做其他事。 同步非阻塞:线程执行客户端请求,但是在此期间可以做其他事,只需要定期检查这个客户端请求是否处理完 异步非阻塞:线程立即响应客户端请求,异步执行其他请求,等到客户端请求处理完会及时响应不需要轮询检查

BIO

BIO 表示同步阻塞式通信,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善,实现线程池之后可以形成一种伪异步的IO模型,不需要一个客户端一个线程,实现线程复用来处理很多个客户端,线程可控。但是高并发下性能还是很差:线程数量少,数据依然是阻塞的,数据没有来线程还是要等待。

同步阻塞式性能极差:大量线程,大量阻塞

NIO

NIO 表示同步非阻塞 IO,服务器实现模式为请求对应一个线程,客户端发送的连接会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求时才启动一个线程进行处理

工作原理:1 个主线程专门负责接收客户端,1 个线程轮询所有的客户端,发来了数据才会开启线程处理

同步:线程还要不断的接收客户端连接,以及处理数据

非阻塞:如果一个管道没有数据,不需要等待,可以轮询下一个管道是否有数据

AIO

AIO 表示异步非阻塞 IO,AIO 引入异步通道的概念,采用了 Proactor 模式,有效的请求才启动线程,特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用

异步:服务端线程接收到了客户端管道以后就交给底层处理 IO 通信,线程可以做其他事情

非阻塞:底层也是客户端有数据才会处理,有了数据以后处理好通知服务器应用来启动线程进行处理

2.2 操作系统层面的IO模型

以Linux为例,Linux 有五种 I/O 模型:阻塞式 I/O、非阻塞式 I/O、I/O 复用、信号驱动式 I/O、异步 I/O

阻塞式IO

应用进程通过系统调用 recvfrom 接收数据,该进程会阻塞等待内核,直到内核准备好数据后从内核缓冲区复制到应用进程缓冲区中才返回。阻塞只是当前阻塞进程不消耗 CPU 时间,这种模型的 CPU 利用率会比较高。recvfrom() 用于接收 Socket 传来的数据并复制到应用进程的缓冲区中。

非阻塞式

应用进程通过 recvfrom 调用不停的去和内核交互,直到内核准备好数据。如果没有准备好数据,内核返回一个错误码,过一段时间应用进程再执行 recvfrom 系统调用,在两次发送请求的时间段,进程可以进行其他任务,这种方式称为轮询(polling)

由于 CPU 要处理更多的系统调用以及应用进程的轮询访问,因此这种模型的 CPU 利用率比较低。

信号驱动

应用进程使用 sigaction 系统调用,内核立即返回,应用进程可以继续执行,等待数据阶段应用进程是非阻塞的。当内核数据准备就绪时向应用进程发送 SIGIO 信号,应用进程收到之后在信号处理程序中调用 recvfrom 将数据从内核复制到应用进程中

相比于非阻塞式 I/O 的轮询方式,信号驱动 I/O 的 CPU 利用率更高

IO 复用

多个进程的IO可以注册到同一个管道上,这个管道会统一和内核进行交互。当管道中的某一个请求需要的数据准备好之后,进程再把对应的数据拷贝到用户空间中。IO 复用模型使用 select 或者 poll 函数等待数据,select 会监听所有注册好的 IO,等待多个套接字中的任何一个变为可读,等待过程会被阻塞,当某个套接字准备好数据变为可读时 select 调用就返回,然后调用 recvfrom 把数据从内核复制到进程中

IO 复用让单个进程具有处理多个 I/O 事件的能力,又被称为 Event Driven I/O,即事件驱动 I/O

如果一个 Web 服务器没有 I/O 复用,那么每一个 Socket 连接都要创建一个线程去处理,如果同时有几万个连接,就需要创建相同数量的线程。相比于多进程和多线程技术,I/O 复用不需要进程线程创建和切换的开销,系统开销更小

异步 IO

应用进程执行 aio_read 系统调用会立即返回,给内核传递描述符、缓冲区指针、缓冲区大小等。应用进程可以继续执行不会被阻塞,内核会在所有操作完成之后向应用进程发送信号。异步 I/O 与信号驱动 I/O 的区别在于,异步 I/O 的信号是通知应用进程 I/O 完成,而信号驱动 I/O 的信号是通知应用进程可以开始 I/O

三、多路复用

IO多路复用模型用于解决同步非阻塞NIO模型中轮询耗费大量CPU的问题。IO多路复用模型通过一个监听线程发起另一种形式的系统调用,由一个线程监听多个文件描述符(fd,linux系统把所有网络请求以一个fd来标识),一旦某个fd的操作就绪(一般是内核缓冲区可读/可写),该系统调用就会返回,随后监听线程可以通知程序对准备好了的fd进行对应的IO系统调用,比如通过recvfrom读取数据。有些地方也称这种IO方式为event driven IO,即事件驱动IO,因为不同的IO行为就绪之后,会返回不同的事件加以区分,比如读就绪事件,写就绪事件等。

目前支持IO多路复用有 select、poll、epoll等等系统调用函数,我们可以将多个socket连接注册到同一个select操作上,就能实现同时对多个socket连接的监听,当其中任何一个socket准备就绪,就能返回进行可读(可写),然后进程再进行recvfrom系统调用,将数据由内核拷贝到用户进程,当然这个拷贝的过程是阻塞的。

3.1 select

select 允许应用程序监视一组文件描述符(fd),等待一个或者多个描述符成为就绪状态,从而完成 I/O 操作。

int select(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  • fd_set 使用 bitmap 数组实现,数组大小用 FD_SETSIZE 定义,单进程只能监听少于 FD_SETSIZE 数量的描述符,32 位机默认是 1024 个,64 位机默认是 2048,可以对进行修改,然后重新编译内核

  • fd_set 有三种类型的描述符:readset、writeset、exceptset,对应读、写、异常条件的描述符集合

  • n 是监测的 socket 的最大数量

  • timeout 为超时参数,调用 select 会一直阻塞直到有描述符的事件到达或者等待的时间超过 timeout

struct timeval{
    long tv_sec;    //秒
    long tv_usec;   //微秒
}
  • 方法成功调用返回结果为就绪的文件描述符个数,出错返回结果为 -1,超时返回结果为 0

select 调用流程图:

  1. 使用 copy_from_user 从用户空间拷贝 fd_set 到内核空间,进程阻塞

  2. 注册回调函数 _pollwait

  3. 遍历所有 fd,调用其对应的 poll 方法判断当前请求是否准备就绪,对于 socket,这个 poll 方法是 sock_poll,sock_poll 根据情况会调用到 tcp_poll、udp_poll 或者 datagram_poll,以 tcp_poll 为例,其核心实现就是 _pollwait

  4. _pollwait 把 current(调用 select 的进程)挂到设备的等待队列,不同设备有不同的等待队列,对于 tcp_poll ,其等待队列是 sk → sk_sleep(把进程挂到等待队列中并不代表进程已经睡眠),在设备收到消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时 current 便被唤醒,进入就绪队列

  5. poll 方法返回时会返回一个描述读写操作是否就绪的 mask 掩码,根据这个 mask 掩码给 fd_set 赋值

  6. 如果遍历完所有的 fd,还没有返回一个可读写的 mask 掩码,则会调用 schedule_timeout 让 current 进程进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程,如果超过一定的超时时间(schedule_timeout)没有其他线程唤醒,则调用 select 的进程会重新被唤醒获得 CPU,进而重新遍历 fd,判断有没有就绪的 fd

  7. 把 fd_set 从内核空间拷贝到用户空间,阻塞进程继续执行

3.2 poll

poll 的功能与 select 类似,也是等待一组描述符中的一个成为就绪状态

int poll(struct pollfd *fds, unsigned int nfds, int timeout);

poll 中的描述符是 pollfd 类型的数组,pollfd 的定义如下:

struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};

select 和 poll 对比

  • select 会修改描述符,而 poll 不会

  • select 的描述符类型使用数组实现,有描述符的限制;而 poll 使用链表实现,没有描述符数量的限制

  • poll 提供了更多的事件类型,并且对描述符的重复利用上比 select 高

  • select 和 poll 速度都比较慢,每次调用都需要将全部描述符数组 fd 从应用进程缓冲区复制到内核缓冲区,同时每次都需要在内核遍历传递进来的所有 fd ,这个开销在 fd 很多时会很大

  • 几乎所有的系统都支持 select,但是只有比较新的系统支持 poll

  • select 和 poll 的时间复杂度 O(n),对 socket 进行扫描时是线性扫描,即采用轮询的方法,效率较低,因为并不知道具体是哪个 socket 具有事件,所以随着 fd 数量的增加会造成遍历速度慢的线性下降性能问题

  • poll 还有一个特点是水平触发,如果报告了 fd 后,没有被处理,那么下次 poll 时会再次报告该 fd

  • 如果一个线程对某个描述符调用了 select 或者 poll,另一个线程关闭了该描述符,会导致调用结果不确定

3.3 epoll

epoll 使用事件的就绪通知方式,通过 epoll_ctl() 向内核注册新的描述符或者是改变某个文件描述符的状态。已注册的描述符在内核中会被维护在一棵红黑树上,一旦该 fd 就绪,内核通过 callback 回调函数将 I/O 准备好的描述符加入到一个链表中管理,进程调用 epoll_wait() 便可以得到事件就绪的描述符

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  • epall_create:一个系统函数,函数将在内核空间内创建一个 epoll 数据结构,可以理解为 epoll 结构空间,返回值为 epoll 的文件描述符编号,以后有 client 连接时,向该 epoll 结构中添加监听,所以 epoll 使用一个文件描述符管理多个描述符

  • epall_ctl:epoll 的事件注册函数,select 函数是调用时指定需要监听的描述符和事件,epoll 先将用户感兴趣的描述符事件注册到 epoll 空间。此函数是非阻塞函数,用来增删改 epoll 空间内的描述符,参数解释:

    • epfd:epoll 结构的进程 fd 编号,函数将依靠该编号找到对应的 epoll 结构

    • op:表示当前请求类型,有三个宏定义:

      • EPOLL_CTL_ADD:注册新的 fd 到 epfd 中

      • EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件

      • EPOLL_CTI_DEL:从 epfd 中删除一个 fd

    • fd:需要监听的文件描述符,一般指 socket_fd

    • event:告诉内核对该 fd 资源感兴趣的事件,epoll_event 的结构:

      struct epoll_event {
          _uint32_t events;   /*epoll events*/
          epoll_data_t data;  /*user data variable*/
      }

      events 可以是以下几个宏集合:EPOLLIN、EPOLOUT、EPOLLPRI、EPOLLERR、EPOLLHUP(挂断)、EPOLET(边缘触发)、EPOLLONESHOT(只监听一次,事件触发后自动清除该 fd,从 epoll 列表)

  • epoll_wait:等待事件的产生,类似于 select() 调用,返回值为本次就绪的 fd 个数,直接从就绪链表获取,时间复杂度 O(1)

    • epfd:指定感兴趣的 epoll 事件列表

    • events:指向一个 epoll_event 结构数组,当函数返回时,内核会把就绪状态的数据拷贝到该数组

    • maxevents:标明 epoll_event 数组最多能接收的数据量,即本次操作最多能获取多少就绪数据

    • timeout:单位为毫秒

      • 0:表示立即返回,非阻塞调用

      • -1:阻塞调用,直到有用户感兴趣的事件就绪为止

      • 大于 0:阻塞调用,阻塞指定时间内如果有事件就绪则提前返回,否则等待指定时间后返回

epoll 的描述符事件有两种触发模式:LT(level trigger)和 ET(edge trigger):

  • LT 模式:当 epoll_wait() 检测到描述符事件到达时,将此事件通知进程,进程可以不立即处理该事件,下次调用 epoll_wait() 会再次通知进程,是默认的一种模式,并且同时支持 Blocking 和 No-Blocking

  • ET 模式:通知之后进程必须立即处理事件,下次再调用 epoll_wait() 时不会再得到事件到达的通知。减少了 epoll 事件被重复触发的次数,因此效率要比 LT 模式高;只支持 No-Blocking,以避免由于一个 fd 的阻塞读/阻塞写操作把处理多个文件描述符的任务饥饿

epoll 的特点

  • epoll 仅适用于 Linux 系统

  • epoll 使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表(个人理解成哑元节点)

  • 没有最大描述符数量(并发连接)的限制,打开 fd 的上限远大于1024(1G 内存能监听约 10 万个端口)

  • epoll 的时间复杂度 O(1),epoll 理解为 event poll,不同于忙轮询和无差别轮询,调用 epoll_wait 只是轮询就绪链表。当监听列表有设备就绪时调用回调函数,把就绪 fd 放入就绪链表中,并唤醒在 epoll_wait 中阻塞的进程,所以 epoll 实际上是事件驱动(每个事件关联上fd)的,降低了 system call 的时间复杂度

  • epoll 内核中根据每个 fd 上的 callback 函数来实现,只有活跃的 socket 才会主动调用 callback,所以使用 epoll 没有前面两者的线性下降的性能问题,效率提高

  • epoll 注册新的事件是注册到到内核中 epoll 句柄中,不需要每次调用 epoll_wait 时重复拷贝,对比前面两种只需要将描述符从进程缓冲区向内核缓冲区拷贝一次,也可以利用 mmap() 文件映射内存加速与内核空间的消息传递(只是可以用,并没有用)

  • 前面两者要把 current 往设备等待队列中挂一次,epoll 也只把 current 往等待队列上挂一次,但是这里的等待队列并不是设备等待队列,只是一个 epoll 内部定义的等待队列,这样可以节省开销

  • epoll 对多线程编程更有友好,一个线程调用了 epoll_wait() 另一个线程关闭了同一个描述符,也不会产生像 select 和 poll 的不确定情况

3.4 应用

应用场景:

  • select 应用场景:

    • select 的 timeout 参数精度为微秒,poll 和 epoll 为毫秒,因此 select 适用实时性要求比较高的场景,比如核反应堆的控制

    • select 可移植性更好,几乎被所有主流平台所支持

  • poll 应用场景:poll 没有最大描述符数量的限制,适用于平台支持并且对实时性要求不高的情况

  • epoll 应用场景:

    • 运行在 Linux 平台上,有大量的描述符需要同时轮询,并且这些连接最好是长连接

    • 需要同时监控小于 1000 个描述符,没必要使用 epoll,因为这个应用场景下并不能体现 epoll 的优势

    • 需要监控的描述符状态变化多,而且是非常短暂的,就没有必要使用 epoll。因为 epoll 中的所有描述符都存储在内核中,每次对描述符的状态改变都需要通过 epoll_ctl() 进行系统调用,频繁系统调用降低效率,并且 epoll 的描述符存储在内核,不容易调试

四、系统调用和零拷贝

4.1 系统调用

用户空间:用户代码、用户堆栈

内核空间:内核代码、内核调度程序、进程描述符(内核堆栈、thread_info 进程描述符)

  • 进程描述符和用户的进程是一一对应的

  • SYS_API 系统调用:如 read、write,系统调用就是 0X80 中断

  • 进程描述符 pd:进程从用户态切换到内核态时,需要保存用户态时的上下文信息在 PCB 中

  • 线程上下文:用户程序基地址,程序计数器、cpu cache、寄存器等,方便程序切回用户态时恢复现场

  • 内核堆栈:系统调用函数也是要创建变量的,这些变量在内核堆栈上分配

80中断:在用户程序中调用操作系统提供的核心态级别的子功能,为了系统安全需要进行用户态和内核态转换,状态的转换需要进行 CPU 中断,中断分为硬中断和软中断:

  • 硬中断:如网络传输中,数据到达网卡后,网卡经过一系列操作后发起硬件中断

  • 软中断:如程序运行过程中本身产生的一些中断

    • 发起 0X80 中断

    • 程序执行碰到除 0 异常

系统调用 system_call 函数所对应的中断指令编号是 0X80(十进制是 8×16=128),而该指令编号对应的就是系统调用程序的入口,所以称系统调用为 80 中断

系统调用的流程:

  • 在 CPU 寄存器里存一个系统调用号,表示哪个系统函数,比如 read

  • 将 CPU 的临时数据都保存到 thread_info 中

  • 执行 80 中断处理程序,找到刚刚存的系统调用号(read),先检查缓存中有没有对应的数据,没有就去磁盘中加载到内核缓冲区,然后从内核缓冲区拷贝到用户空间

  • 最后恢复到用户态,通过 thread_info 恢复现场,用户态继续执行

4.2 零拷贝

DMA (Direct Memory Access) :直接存储器访问,让外部设备不通过 CPU 直接与系统内存交换数据的接口技术

作用:可以解决批量数据的输入/输出问题,使数据的传送速度取决于存储器和外设的工作速度

把内存数据传输到网卡然后发送:

  • 没有 DMA:CPU 读内存数据到 CPU 高速缓存,再写到网卡,这样就把 CPU 的速度拉低到和网卡一个速度

  • 使用 DMA:把数据读到 Socket 内核缓存区(CPU 复制),CPU 分配给 DMA 开始异步操作,DMA 读取 Socket 缓冲区到 DMA 缓冲区,然后写到网卡。DMA 执行完后中断(就是通知) CPU,这时 Socket 内核缓冲区为空,CPU 从用户态切换到内核态,执行中断处理程序,将需要使用 Socket 缓冲区的阻塞进程移到就绪队列

一个完整的 DMA 传输过程必须经历 DMA 请求、DMA 响应、DMA 传输、DMA 结束四个步骤:

DMA 方式是一种完全由硬件进行信息传送的控制方式,通常系统总线由 CPU 管理,在 DMA 方式中,CPU 的主存控制信号被禁止使用,CPU 把总线(地址总线、数据总线、控制总线)让出来由 DMA 控制器接管,用来控制传送的字节数、判断 DMA 是否结束、以及发出 DMA 结束信号,所以 DMA 控制器必须有以下功能:

  • 接受外设发出的 DMA 请求,并向 CPU 发出总线接管请求

  • 当 CPU 发出允许接管信号后,进入 DMA 操作周期

  • 确定传送数据的主存单元地址及长度,并自动修改主存地址计数和传送长度计数

  • 规定数据在主存和外设间的传送方向,发出读写等控制信号,执行数据传送操作

  • 判断 DMA 传送是否结束,发出 DMA 结束信号,使 CPU 恢复正常工作状态(中断)

BIO:传统的 I/O 操作进行了 4 次用户空间与内核空间的上下文切换,以及 4 次数据拷贝:

  • JVM 发出 read 系统调用,OS 上下文切换到内核模式(切换 1)并将数据从网卡或硬盘等设备通过 DMA 读取到内核空间缓冲区(拷贝 1),内核缓冲区实际上是磁盘高速缓存(PageCache)

  • OS 内核将数据复制到用户空间缓冲区(拷贝 2),然后 read 系统调用返回,又会导致一次内核空间到用户空间的上下文切换(切换 2)

  • JVM 处理代码逻辑并发送 write() 系统调用,OS 上下文切换到内核模式(切换3)并从用户空间缓冲区复制数据到内核空间缓冲区(拷贝3)

  • 将内核空间缓冲区中的数据写到 hardware(拷贝4),write 系统调用返回,导致内核空间到用户空间的再次上下文切换(切换4)

流程图中的箭头反过来也成立,可以从网卡获取数据

read 调用图示:read、write 都是系统调用指令

mmap:(Memory Mapped Files)内存映射加 write 实现零拷贝,零拷贝就是没有数据从内核空间复制到用户空间

用户空间和内核空间都使用内存,所以可以共享同一块物理内存地址,省去用户态和内核态之间的拷贝。写网卡时,共享空间的内容拷贝到 Socket 缓冲区,然后交给 DMA 发送到网卡,只需要 3 次复制

进行了 4 次用户空间与内核空间的上下文切换,以及 3 次数据拷贝(2 次 DMA,一次 CPU 复制):

  • 发出 mmap 系统调用,DMA 拷贝到内核缓冲区,映射到共享缓冲区;mmap 系统调用返回,无需拷贝

  • 发出 write 系统调用,将数据从内核缓冲区拷贝到内核 Socket 缓冲区;write 系统调用返回,DMA 将内核空间 Socket 缓冲区中的数据传递到协议引擎

原理:利用操作系统的 Page 来实现文件到物理内存的直接映射,完成映射后对物理内存的操作会被同步到硬盘上

缺点:不可靠,写到 mmap 中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用 flush 的时候才把数据真正的写到硬盘

Java NIO 提供了 MappedByteBuffer 类可以用来实现 mmap 内存映射,MappedByteBuffer 类对象只能通过调用 FileChannel.map() 获取

sendfile:sendfile 实现零拷贝,打开文件的文件描述符 fd 和 socket 的 fd 传递给 sendfile,然后经过 3 次复制和 2 次用户态和内核态的切换

原理:数据根本不经过用户态,直接从内核缓冲区进入到 Socket Buffer,由于和用户态完全无关,就减少了两次上下文切换

说明:零拷贝技术是不允许进程对文件内容作进一步的加工的,比如压缩数据再发送

 

sendfile2.4 之后,sendfile 实现了更简单的方式,文件到达内核缓冲区后,不必再将数据全部复制到 socket buffer 缓冲区,而是只将记录数据位置和长度相关等描述符信息保存到 socket buffer,DMA 根据 Socket 缓冲区中描述符提供的位置和偏移量信息直接将内核空间缓冲区中的数据拷贝到协议引擎上(2 次复制 2 次切换)

Java NIO 对 sendfile 的支持是 FileChannel.transferTo()/transferFrom(),把磁盘文件读取 OS 内核缓冲区后的 fileChannel,直接转给 socketChannel 发送,底层就是 sendfile

五、BIO

5.1 InetAddress

一个 InetAddress 类的对象就代表一个 IP 地址对象

成员方法:

  • static InetAddress getLocalHost():获得本地主机 IP 地址对象

  • static InetAddress getByName(String host):根据 IP 地址字符串或主机名获得对应的 IP 地址对象

  • String getHostName():获取主机名

  • String getHostAddress():获得 IP 地址字符串

public class InetAddressDemo {
    public static void main(String[] args) throws Exception {
        // 1.获取本机地址对象
        InetAddress ip = InetAddress.getLocalHost();
        System.out.println(ip.getHostName());//DESKTOP-NNMBHQR
        System.out.println(ip.getHostAddress());//192.168.11.1
        // 2.获取域名ip对象
        InetAddress ip2 = InetAddress.getByName("www.baidu.com");
        System.out.println(ip2.getHostName());//www.baidu.com
        System.out.println(ip2.getHostAddress());//14.215.177.38
        // 3.获取公网IP对象。
        InetAddress ip3 = InetAddress.getByName("182.61.200.6");
        System.out.println(ip3.getHostName());//182.61.200.6
        System.out.println(ip3.getHostAddress());//182.61.200.6
        
        // 4.判断是否能通: ping  5s之前测试是否可通
        System.out.println(ip2.isReachable(5000)); // ping百度
    }
}

5.2 UDP

基本介绍

UDP(User Datagram Protocol)协议的特点:

  • 面向无连接的协议,发送端只管发送,不确认对方是否能收到,速度快,但是不可靠,会丢失数据

  • 尽最大努力交付,没有拥塞控制

  • 基于数据包进行数据传输,发送数据的包的大小限制 64KB 以内

  • 支持一对一、一对多、多对一、多对多的交互通信

UDP 协议的使用场景:在线视频、网络语音、电话

实现UDP

UDP 协议相关的两个类:

  • DatagramPacket(数据包对象):用来封装要发送或要接收的数据,比如:集装箱

  • DatagramSocket(发送对象):用来发送或接收数据包,比如:码头

DatagramPacket

  • DatagramPacket 类:

    public new DatagramPacket(byte[] buf, int length, InetAddress address, int port):创建发送端数据包对象

    • buf:要发送的内容,字节数组

    • length:要发送内容的长度,单位是字节

    • address:接收端的IP地址对象

    • port:接收端的端口号

    public new DatagramPacket(byte[] buf, int length):创建接收端的数据包对象

    • buf:用来存储接收到内容

    • length:能够接收内容的长度

  • DatagramPacket 类常用方法:

    • public int getLength():获得实际接收到的字节个数

    • public byte[] getData():返回数据缓冲区

DatagramSocket

  • DatagramSocket 类构造方法:

    • protected DatagramSocket():创建发送端的 Socket 对象,系统会随机分配一个端口号

    • protected DatagramSocket(int port):创建接收端的 Socket 对象并指定端口号

  • DatagramSocket 类成员方法:

    • public void send(DatagramPacket dp):发送数据包

    • public void receive(DatagramPacket p):接收数据包

    • public void close():关闭数据报套接字

public class UDPClientDemo {
    public static void main(String[] args) throws Exception {
        System.out.println("===启动客户端===");
        // 1.创建一个集装箱对象,用于封装需要发送的数据包!
        byte[] buffer = "我学Java".getBytes();
        DatagramPacket packet = new DatagramPacket(buffer,bubffer.length,InetAddress.getLoclHost,8000);
        // 2.创建一个码头对象
        DatagramSocket socket = new DatagramSocket();
        // 3.开始发送数据包对象
        socket.send(packet);
        socket.close();
    }
}
public class UDPServerDemo{
    public static void main(String[] args) throws Exception {
        System.out.println("==启动服务端程序==");
        // 1.创建一个接收客户都端的数据包对象(集装箱)
        byte[] buffer = new byte[1024*64];
        DatagramPacket packet = new DatagramPacket(buffer, bubffer.length);
        // 2.创建一个接收端的码头对象
        DatagramSocket socket = new DatagramSocket(8000);
        // 3.开始接收
        socket.receive(packet);
        // 4.从集装箱中获取本次读取的数据量
        int len = packet.getLength();
        // 5.输出数据
        // String rs = new String(socket.getData(), 0, len)
        String rs = new String(buffer , 0 , len);
        System.out.println(rs);
        // 6.服务端还可以获取发来信息的客户端的IP和端口。
        String ip = packet.getAddress().getHostAdress();
        int port = packet.getPort();
        socket.close();
    }
}

通讯方式

UDP 通信方式:

  • 单播:用于两个主机之间的端对端通信

  • 组播:用于对一组特定的主机进行通信

    IP : 224.0.1.0

    Socket 对象 : MulticastSocket

  • 广播:用于一个主机对整个局域网上所有主机上的数据通信

    IP : 255.255.255.255

    Socket 对象 : DatagramSocket

5.3 TCP

基本介绍

TCP/IP (Transfer Control Protocol) 协议,传输控制协议

TCP/IP 协议的特点:

  • 面向连接的协议,提供可靠交互,速度慢

  • 点对点的全双工通信

  • 通过三次握手建立连接,连接成功形成数据传输通道;通过四次挥手断开连接

  • 基于字节流进行数据传输,传输数据大小没有限制

TCP 协议的使用场景:文件上传和下载、邮件发送和接收、远程登录

注意:TCP 不会为没有数据的 ACK 超时重传

Socket

TCP 通信也叫 Socket 网络编程,只要代码基于 Socket 开发,底层就是基于了可靠传输的 TCP 通信

双向通信:Java Socket 是全双工的,在任意时刻,线路上存在 A -> BB -> A 的双向信号传输,即使是阻塞 IO,读和写也是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读

TCP 协议相关的类:

  • Socket:一个该类的对象就代表一个客户端程序。

  • ServerSocket:一个该类的对象就代表一个服务器端程序。

Socket 类:

  • 构造方法:

    • Socket(InetAddress address,int port):创建流套接字并将其连接到指定 IP 指定端口号

    • Socket(String host, int port):根据 IP 地址字符串和端口号创建客户端 Socket 对象

      注意事项:执行该方法,就会立即连接指定的服务器,连接成功,则表示三次握手通过,反之抛出异常

  • 常用 API:

    • OutputStream getOutputStream():获得字节输出流对象

    • InputStream getInputStream():获得字节输入流对象

    • void shutdownInput():停止接受

    • void shutdownOutput():停止发送数据,终止通信

    • SocketAddress getRemoteSocketAddress():返回套接字连接到的端点的地址,未连接返回 null

ServerSocket 类:

  • 构造方法:public ServerSocket(int port)

  • 常用 API:public Socket accept(),阻塞等待接收一个客户端的 Socket 管道连接请求,连接成功返回一个 Socket 对象

    三次握手后 TCP 连接建立成功,服务器内核会把连接从 SYN 半连接队列(一次握手时在服务端建立的队列)中移出,移入 accept 全连接队列,等待进程调用 accept 函数时把连接取出。如果进程不能及时调用 accept 函数,就会造成 accept 队列溢出,最终导致建立好的 TCP 连接被丢弃

相当于客户端和服务器建立一个数据管道(虚连接,不是真正的物理连接),管道一般不用 close

实现TCP

开发流程

客户端的开发流程:

  1. 客户端要请求于服务端的 Socket 管道连接

  2. 从 Socket 通信管道中得到一个字节输出流

  3. 通过字节输出流给服务端写出数据

服务端的开发流程:

  1. 用 ServerSocket 注册端口

  2. 接收客户端的 Socket 管道连接

  3. 从 Socket 通信管道中得到一个字节输入流

  4. 从字节输入流中读取客户端发来的数据

  • 如果输出缓冲区空间不够存放主机发送的数据,则会被阻塞,输入缓冲区同理

  • 缓冲区不属于应用程序,属于内核

  • TCP 从输出缓冲区读取数据会加锁阻塞线程

实现通信

需求一:客户端发送一行数据,服务端接收一行数据

public class ClientDemo {
    public static void main(String[] args) throws Exception {
        // 1.客户端要请求于服务端的socket管道连接。
        Socket socket = new Socket("127.0.0.1", 8080);
        // 2.从socket通信管道中得到一个字节输出流
        OutputStream os = socket.getOutputStream();
        // 3.把低级的字节输出流包装成高级的打印流。
        PrintStream ps = new PrintStream(os);
        // 4.开始发消息出去
        ps.println("我是客户端");
        ps.flush();//一般不关闭IO流
        System.out.println("客户端发送完毕~~~~");
    }
}
public class ServerDemo{
    public static void main(String[] args) throws Exception {
        System.out.println("----服务端启动----");
        // 1.注册端口: public ServerSocket(int port)
        ServerSocket serverSocket = new ServerSocket(8080);
        // 2.开始等待接收客户端的Socket管道连接。
        Socket socket = serverSocket.accept();
        // 3.从socket通信管道中得到一个字节输入流。
        InputStream is = socket.getInputStream();
        // 4.把字节输入流转换成字符输入流
        BufferedReader br = new BufferedReader(new InputStreamReader(is));
        // 6.按照行读取消息 。
        String line;
        if((line = br.readLine()) != null){
            System.out.println(line);
        }
    }
}

需求二:客户端可以反复发送数据,服务端可以反复数据

public class ClientDemo {
    public static void main(String[] args) throws Exception {
        // 1.客户端要请求于服务端的socket管道连接。
        Socket socket = new Socket("127.0.0.1",8080);
        // 2.从socket通信管道中得到一个字节输出流
        OutputStream os = socket.getOutputStream();
        // 3.把低级的字节输出流包装成高级的打印流。
        PrintStream ps = new PrintStream(os);
        // 4.开始发消息出去
         while(true){
            Scanner sc = new Scanner(System.in);
            System.out.print("请说:");
            ps.println(sc.nextLine());
            ps.flush();
        }
    }
}
public class ServerDemo{
    public static void main(String[] args) throws Exception {
        System.out.println("----服务端启动----");
        // 1.注册端口: public ServerSocket(int port)
        ServerSocket serverSocket = new ServerSocket(8080);
        // 2.开始等待接收客户端的Socket管道连接。
        Socket socket = serverSocket.accept();
        // 3.从socket通信管道中得到一个字节输入流。
        InputStream is = socket.getInputStream();
        // 4.把字节输入流转换成字符输入流
        BufferedReader br = new BufferedReader(new InputStreamReader(is));
        // 6.按照行读取消息 。
        String line;
        while((line = br.readLine()) != null){
            System.out.println(line);
        }
    }
}

需求三:实现一个服务端可以同时接收多个客户端的消息

public class ClientDemo {
    public static void main(String[] args) throws Exception {
        Socket socket = new Socket("127.0.0.1",8080);
        OutputStream os = new socket.getOutputStream();
        PrintStream ps = new PrintStream(os);
        while(true){
            Scanner sc = new Scanner(System.in);
            System.out.print("请说:");
            ps.println(sc.nextLine());
            ps.flush();
        }
    }
}
public class ServerDemo{
    public static void main(String[] args) throws Exception {
        System.out.println("----服务端启动----");
        ServerSocket serverSocket = new ServerSocket(8080);
        while(true){
            // 开始等待接收客户端的Socket管道连接。
             Socket socket = serverSocket.accept();
            // 每接收到一个客户端必须为这个客户端管道分配一个独立的线程来处理与之通信。
            new ServerReaderThread(socket).start();
        }
    }
}
class ServerReaderThread extends Thread{
    privat Socket socket;
    public ServerReaderThread(Socket socket){this.socket = socket;}
    @Override
    public void run() {
        try(InputStream is = socket.getInputStream();
            BufferedReader br = new BufferedReader(new InputStreamReader(is))
           ){
            String line;
            while((line = br.readLine()) != null){
                sout(socket.getRemoteSocketAddress() + ":" + line);
            }
        }catch(Exception e){
            sout(socket.getRemoteSocketAddress() + "下线了~~~~~~");
        }
    }
}

伪异步

一个客户端要一个线程,并发越高系统瘫痪的越快,可以在服务端引入线程池,使用线程池来处理与客户端的消息通信

  • 优势:不会引起系统的死机,可以控制并发线程的数量

  • 劣势:同时可以并发的线程将受到限制

public class BIOServer {
    public static void main(String[] args) throws Exception {
        //线程池机制
        //创建一个线程池,如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //创建ServerSocket
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务器启动了");
        while (true) {
            System.out.println("线程名字 = " + Thread.currentThread().getName());
            //监听,等待客户端连接
            System.out.println("等待连接....");
            final Socket socket = serverSocket.accept();
            System.out.println("连接到一个客户端");
            //创建一个线程,与之通讯
            newCachedThreadPool.execute(new Runnable() {
                public void run() {
                    //可以和客户端通讯
                    handler(socket);
                }
            });
        }
    }
​
    //编写一个handler方法,和客户端通讯
    public static void handler(Socket socket) {
        try {
            System.out.println("线程名字 = " + Thread.currentThread().getName());
            byte[] bytes = new byte[1024];
            //通过socket获取输入流
            InputStream inputStream = socket.getInputStream();
            int len;
            //循环的读取客户端发送的数据
            while ((len = inputStream.read(bytes)) != -1) {
                System.out.println("线程名字 = " + Thread.currentThread().getName());
                //输出客户端发送的数据
                System.out.println(new String(bytes, 0, read));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("关闭和client的连接");
            try {
                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

文件传输

字节流

客户端:本地图片: ‪E:\seazean\图片资源\beautiful.jpg 服务端:服务器路径:E:\seazean\图片服务器

UUID. randomUUID() : 方法生成随机的文件名

socket.shutdownOutput():这个必须执行,不然服务器会一直循环等待数据,最后文件损坏,程序报错

//常量包
public class Constants {
    public static final String SRC_IMAGE = "D:\\seazean\\图片资源\\beautiful.jpg";
    public static final String SERVER_DIR = "D:\\seazean\\图片服务器\\";
    public static final String SERVER_IP = "127.0.0.1";
    public static final int SERVER_PORT = 8888;
​
}
public class ClientDemo {
    public static void main(String[] args) throws Exception {
        Socket socket = new Socket(Constants.ERVER_IP,Constants.SERVER_PORT);
        BufferedOutputStream bos=new BufferedOutputStream(socket.getOutputStream());
        //提取本机的图片上传给服务端。Constants.SRC_IMAGE
        BufferedInputStream bis = new BufferedInputStream(new FileInputStream());
        byte[] buffer = new byte[1024];
        int len ;
        while((len = bis.read(buffer)) != -1) {
            bos.write(buffer, 0 ,len);
        }
        bos.flush();// 刷新图片数据到服务端!!
        socket.shutdownOutput();// 告诉服务端我的数据已经发送完毕,不要在等我了!
        bis.close();
        
        //等待着服务端的响应数据!!
        BufferedReader br = new BufferedReader(
                         new InputStreamReader(socket.getInputStream()));
        System.out.println("收到服务端响应:"+br.readLine());
    }
}
public class ServerDemo {
    public static void main(String[] args) throws Exception {
        System.out.println("----服务端启动----");
        // 1.注册端口: 
        ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT);
        // 2.定义一个循环不断的接收客户端的连接请求
        while(true){
            // 3.开始等待接收客户端的Socket管道连接。
            Socket socket = serverSocket.accept();
            // 4.每接收到一个客户端必须为这个客户端管道分配一个独立的线程来处理与之通信。
            new ServerReaderThread(socket).start();
        }
    }
}
class ServerReaderThread extends Thread{
    private Socket socket ;
    public ServerReaderThread(Socket socket){this.socket = socket;}
    @Override
    public void run() {
        try{
            InputStream is = socket.getInputStream();
            BufferedInputStream bis = new BufferedInputStream(is);
            BufferedOutputStream bos = new BufferedOutputStream(
                new FileOutputStream
                (Constants.SERVER_DIR+UUID.randomUUID().toString()+".jpg"));
            byte[] buffer = new byte[1024];
            int len;
            while((len = bis.read(buffer)) != -1){
                bos.write(buffer,0,len);
            }
            bos.close();
            System.out.println("服务端接收完毕了!");
            
            // 4.响应数据给客户端
            PrintStream ps = new PrintStream(socket.getOutputStream());
            ps.println("您好,已成功接收您上传的图片!");
            ps.flush();
            Thread.sleep(10000);
        }catch (Exception e){
            sout(socket.getRemoteSocketAddress() + "下线了");
        }
    }
}

数据流

构造方法:

  • DataOutputStream(OutputStream out) : 创建一个新的数据输出流,以将数据写入指定的底层输出流

  • DataInputStream(InputStream in) : 创建使用指定的底层 InputStream 的 DataInputStream

常用API:

  • final void writeUTF(String str) : 使用机器无关的方式使用 UTF-8 编码将字符串写入底层输出流

  • final String readUTF() : 读取以 modified UTF-8 格式编码的 Unicode 字符串,返回 String 类型

public class Client {
    public static void main(String[] args) {
        InputStream is = new FileInputStream("path");
            //  1、请求与服务端的Socket链接
            Socket socket = new Socket("127.0.0.1" , 8888);
            //  2、把字节输出流包装成一个数据输出流
            DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
            //  3、先发送上传文件的后缀给服务端
            dos.writeUTF(".png");
            //  4、把文件数据发送给服务端进行接收
            byte[] buffer = new byte[1024];
            int len;
            while((len = is.read(buffer)) > 0 ){
                dos.write(buffer , 0 , len);
            }
            dos.flush();
            Thread.sleep(10000);
    }
}
​
public class Server {
    public static void main(String[] args) {
        ServerSocket ss = new ServerSocket(8888);
        Socket socket = ss.accept();
        // 1、得到一个数据输入流读取客户端发送过来的数据
        DataInputStream dis = new DataInputStream(socket.getInputStream());
        // 2、读取客户端发送过来的文件类型
        String suffix = dis.readUTF();
        // 3、定义一个字节输出管道负责把客户端发来的文件数据写出去
        OutputStream os = new FileOutputStream("path"+
                    UUID.randomUUID().toString()+suffix);
        // 4、从数据输入流中读取文件数据,写出到字节输出流中去
        byte[] buffer = new byte[1024];
        int len;
        while((len = dis.read(buffer)) > 0){
            os.write(buffer,0, len);
        }
        os.close();
        System.out.println("服务端接收文件保存成功!");
    }
}

六、NIO

6.1 基本介绍

NIO的介绍

Java NIO(New IO、Java non-blocking IO),从 Java 1.4 版本开始引入的一个新的 IO API,可以替代标准的 Java IO API,NIO 支持面向缓冲区的、基于通道的 IO 操作,以更加高效的方式进行文件的读写操作

  • NIO 有三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)

  • NIO 是非阻塞 IO,传统 IO 的 read 和 write 只能阻塞执行,线程在读写 IO 期间不能干其他事情,比如调用 socket.accept(),如果服务器没有数据传输过来,线程就一直阻塞,而 NIO 中可以配置 Socket 为非阻塞模式

  • NIO 可以做到用一个线程来处理多个操作的。假设有 1000 个请求过来,根据实际情况可以分配 20 或者 80 个线程来处理,不像之前的阻塞 IO 那样分配 1000 个

NIO 和 BIO 的比较:

  • BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多

  • BIO 是阻塞的,NIO 则是非阻塞的

  • BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel 和 Buffer 进行操作,数据从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector 用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

    NIO BIO
    面向缓冲区(Buffer) 面向流(Stream)
    非阻塞(Non Blocking IO) 阻塞IO(Blocking IO)
    选择器(Selectors)

6.2 实现原理

NIO 三大核心部分:Channel (通道)、Buffer (缓冲区)、Selector (选择器)

  • Buffer 缓冲区

    缓冲区本质是一块可以写入数据、读取数据的内存,底层是一个数组,这块内存被包装成 NIO Buffer 对象,并且提供了方法用来操作这块内存,相比较直接对数组的操作,Buffer 的 API 更加容易操作和管理

  • Channel 通道

    Java NIO 的通道类似流,不同的是既可以从通道中读取数据,又可以写数据到通道,流的读写通常是单向的,通道可以非阻塞读取和写入通道,支持读取或写入缓冲区,也支持异步地读写

  • Selector 选择器

    Selector 是一个 Java NIO 组件,能够检查一个或多个 NIO 通道,并确定哪些通道已经准备好进行读取或写入,这样一个单独的线程可以管理多个 channel,从而管理多个网络连接,提高效率

NIO 的实现框架:

  • 每个 Channel 对应一个 Buffer

  • 一个线程对应 Selector , 一个 Selector 对应多个 Channel(连接)

  • 程序切换到哪个 Channel 是由事件决定的,Event 是一个重要的概念

  • Selector 会根据不同的事件,在各个通道上切换

  • Buffer 是一个内存块 , 底层是一个数组

  • 数据的读取写入是通过 Buffer 完成的 , BIO 中要么是输入流,或者是输出流,不能双向,NIO 的 Buffer 是可以读也可以写, flip() 切换 Buffer 的工作模式

Java NIO 系统的核心在于:通道和缓冲区,通道表示打开的 IO 设备(例如:文件、 套接字)的连接。若要使用 NIO 系统,获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区,然后操作缓冲区,对数据进行处理。简而言之,Channel 负责传输, Buffer 负责存取数据

6.3 缓冲区

基本介绍

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,用于特定基本数据类型的容器,用于与 NIO 通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的

Buffer 底层是一个数组,可以保存多个相同类型的数据,根据数据类型不同 ,有以下 Buffer 常用子类:ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer

基本属性

  • 容量(capacity):作为一个内存块,Buffer 具有固定大小,缓冲区容量不能为负,并且创建后不能更改

  • 限制 (limit):表示缓冲区中可以操作数据的大小(limit 后数据不能进行读写),缓冲区的限制不能为负,并且不能大于其容量。写入模式,limit 等于 buffer 的容量;读取模式下,limit 等于写入的数据量

  • 位置(position):下一个要读取或写入的数据的索引,缓冲区的位置不能为负,并且不能大于其限制

  • 标记(mark)与重置(reset):标记是一个索引,通过 Buffer 中的 mark() 方法指定 Buffer 中一个特定的位置,可以通过调用 reset() 方法恢复到这个 position

  • 位置、限制、容量遵守以下不变式: 0 <= position <= limit <= capacity

常用API

static XxxBuffer allocate(int capacity):创建一个容量为 capacity 的 XxxBuffer 对象

Buffer 基本操作:

方法 说明
public Buffer clear() 清空缓冲区,不清空内容,将位置设置为零,限制设置为容量
public Buffer flip() 翻转缓冲区,将缓冲区的界限设置为当前位置,position 置 0
public int capacity() 返回 Buffer的 capacity 大小
public final int limit() 返回 Buffer 的界限 limit 的位置
public Buffer limit(int n) 设置缓冲区界限为 n
public Buffer mark() 在此位置对缓冲区设置标记
public final int position() 返回缓冲区的当前位置 position
public Buffer position(int n) 设置缓冲区的当前位置为n
public Buffer reset() 将位置 position 重置为先前 mark 标记的位置
public Buffer rewind() 将位置设为为 0,取消设置的 mark
public final int remaining() 返回当前位置 position 和 limit 之间的元素个数
public final boolean hasRemaining() 判断缓冲区中是否还有元素
public static ByteBuffer wrap(byte[] array) 将一个字节数组包装到缓冲区中
abstract ByteBuffer asReadOnlyBuffer() 创建一个新的只读字节缓冲区
public abstract ByteBuffer compact() 缓冲区当前位置与其限制(如果有)之间的字节被复制到缓冲区的开头

Buffer 数据操作:

方法 说明
public abstract byte get() 读取该缓冲区当前位置的单个字节,然后位置 + 1
public ByteBuffer get(byte[] dst) 读取多个字节到字节数组 dst 中
public abstract byte get(int index) 读取指定索引位置的字节,不移动 position
public abstract ByteBuffer put(byte b) 将给定单个字节写入缓冲区的当前位置,position+1
public final ByteBuffer put(byte[] src) 将 src 字节数组写入缓冲区的当前位置
public abstract ByteBuffer put(int index, byte b) 将指定字节写入缓冲区的索引位置,不移动 position

提示:"\n",占用两个字节

读写数据

使用 Buffer 读写数据一般遵循以下四个步骤:

  • 写入数据到 Buffer

  • 调用 flip()方法,转换为读取模式

  • 从 Buffer 中读取数据

  • 调用 buffer.clear() 方法清除缓冲区(不是清空了数据,只是重置指针)

public class TestBuffer {
    @Test
    public void test(){
        String str = "seazean";
        //1. 分配一个指定大小的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        System.out.println("-----------------allocate()----------------");
        System.out.println(bufferf.position());//0
        System.out.println(buffer.limit());//1024
        System.out.println(buffer.capacity());//1024
        
        //2. 利用 put() 存入数据到缓冲区中
        buffer.put(str.getBytes());
        System.out.println("-----------------put()----------------");
        System.out.println(bufferf.position());//7
        System.out.println(buffer.limit());//1024
        System.out.println(buffer.capacity());//1024
        
        //3. 切换读取数据模式
        buffer.flip();
        System.out.println("-----------------flip()----------------");
        System.out.println(buffer.position());//0
        System.out.println(buffer.limit());//7
        System.out.println(buffer.capacity());//1024
        
        //4. 利用 get() 读取缓冲区中的数据
        byte[] dst = new byte[buffer.limit()];
        buffer.get(dst);
        System.out.println(dst.length);
        System.out.println(new String(dst, 0, dst.length));
        System.out.println(buffer.position());//7
        System.out.println(buffer.limit());//7
       
        //5. clear() : 清空缓冲区. 但是缓冲区中的数据依然存在,但是处于“被遗忘”状态
        System.out.println(buffer.hasRemaining());//true
        buffer.clear();
        System.out.println(buffer.hasRemaining());//true
        System.out.println("-----------------clear()----------------");
        System.out.println(buffer.position());//0
        System.out.println(buffer.limit());//1024
        System.out.println(buffer.capacity());//1024
    }
}

粘包拆包

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔,但这些数据在接收时,被进行了重新组合

// Hello,world\n
// I'm zhangsan\n
// How are you?\n
------ > 黏包,半包
// Hello,world\nI'm zhangsan\nHo
// w are you?\n
public static void main(String[] args) {
    ByteBuffer source = ByteBuffer.allocate(32);
    //                     11            24
    source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
    split(source);
​
    source.put("w are you?\nhaha!\n".getBytes());
    split(source);
}
​
private static void split(ByteBuffer source) {
    source.flip();
    int oldLimit = source.limit();
    for (int i = 0; i < oldLimit; i++) {
        if (source.get(i) == '\n') {
            // 根据数据的长度设置缓冲区
            ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
            // 0 ~ limit
            source.limit(i + 1);
            target.put(source); // 从source 读,向 target 写
            // debugAll(target); 访问 buffer 的方法
            source.limit(oldLimit);
        }
    }
    // 访问过的数据复制到开头
    source.compact();
}

6.4 直接内存

基本介绍

Byte Buffer 有两种类型,一种是基于直接内存(也就是非堆内存),另一种是非直接内存(也就是堆内存)

Direct Memory 优点:

  • Java 的 NIO 库允许 Java 程序使用直接内存,使用 native 函数直接分配堆外内存

  • 读写性能高,读写频繁的场合可能会考虑使用直接内存

  • 大大提高 IO 性能,避免了在 Java 堆和 native 堆来回复制数据

直接内存缺点:

  • 不能使用内核缓冲区 Page Cache 的缓存优势,无法缓存最近被访问的数据和使用预读功能

  • 分配回收成本较高,不受 JVM 内存回收管理

  • 可能导致 OutOfMemoryError 异常:OutOfMemoryError: Direct buffer memory

  • 回收依赖 System.gc() 的调用,但这个调用 JVM 不保证执行、也不保证何时执行,行为是不可控的。程序一般需要自行管理,成对去调用 malloc、free

应用场景:

  • 传输很大的数据文件,数据的生命周期很长,导致 Page Cache 没有起到缓存的作用,一般采用直接 IO 的方式

  • 适合频繁的 IO 操作,比如网络并发场景

数据流的角度:

  • 非直接内存的作用链:本地 IO → 内核缓冲区→ 用户(JVM)缓冲区 →内核缓冲区 → 本地 IO

  • 直接内存是:本地 IO → 直接内存 → 本地 IO

JVM 直接内存图解:

通信原理

堆外内存不受 JVM GC 控制,可以使用堆外内存进行通信,防止 GC 后缓冲区位置发生变化的情况

NIO 使用的 SocketChannel 也是使用的堆外内存,源码解析:

  • SocketChannel#write(java.nio.ByteBuffer) → SocketChannelImpl#write(java.nio.ByteBuffer)

    public int write(ByteBuffer var1) throws IOException {
         do {
             var3 = IOUtil.write(this.fd, var1, -1L, nd);
         } while(var3 == -3 && this.isOpen());
    }
  • IOUtil#write(java.io.FileDescriptor, java.nio.ByteBuffer, long, sun.nio.ch.NativeDispatcher)

    static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) {
        // 【判断是否是直接内存,是则直接写出,不是则封装到直接内存】
        if (var1 instanceof DirectBuffer) {
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {
            //....
            // 从堆内buffer拷贝到堆外buffer
            ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
            var8.put(var1);
            //...
            // 从堆外写到内核缓冲区
            int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
        }
    }
  • 读操作相同

分配回收

直接内存创建 Buffer 对象:static XxxBuffer allocateDirect(int capacity)

DirectByteBuffer 源码分析:

DirectByteBuffer(int cap) { 
    //....
    long base = 0;
    try {
        // 分配直接内存
        base = unsafe.allocateMemory(size);
    }
    // 内存赋值
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    // 创建回收函数
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
}
private static class Deallocator implements Runnable {
    public void run() {
        unsafe.freeMemory(address);
        //...
    }
}

分配和回收原理

  • 使用了 Unsafe 对象的 allocateMemory 方法完成直接内存的分配,setMemory 方法完成赋值

  • ByteBuffer 的实现类内部,使用了 Cleaner(虚引用)来监测 ByteBuffer 对象,一旦 ByteBuffer 对象被垃圾回收,那么 ReferenceHandler 线程通过 Cleaner 的 clean 方法调用 Deallocator 的 run方法,最后通过 freeMemory 来释放直接内存

/**
 * 直接内存分配的底层原理:Unsafe
 */
public class Demo1_27 {
    static int _1Gb = 1024 * 1024 * 1024;
​
    public static void main(String[] args) throws IOException {
        Unsafe unsafe = getUnsafe();
        // 分配内存
        long base = unsafe.allocateMemory(_1Gb);
        unsafe.setMemory(base, _1Gb, (byte) 0);
        System.in.read();
        // 释放内存
        unsafe.freeMemory(base);
        System.in.read();
    }
​
    public static Unsafe getUnsafe() {
        try {
            Field f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            Unsafe unsafe = (Unsafe) f.get(null);
            return unsafe;
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new RuntimeException(e);
        }
    }
}

共享内存

FileChannel 提供 map 方法返回 MappedByteBuffer 对象,把文件映射到内存,通常情况可以映射整个文件,如果文件比较大,可以进行分段映射,完成映射后对物理内存的操作会被同步到硬盘上

FileChannel 中的成员属性:

  • MapMode.mode:内存映像文件访问的方式,共三种:

    • MapMode.READ_ONLY:只读,修改得到的缓冲区将导致抛出异常

    • MapMode.READ_WRITE:读/写,对缓冲区的更改最终将写入文件,但此次修改对映射到同一文件的其他程序不一定是可见

    • MapMode.PRIVATE:私用,可读可写,但是修改的内容不会写入文件,只是 buffer 自身的改变

  • public final FileLock lock():获取此文件通道的排他锁

MappedByteBuffer,可以让文件在直接内存(堆外内存)中进行修改,这种方式叫做内存映射,可以直接调用系统底层的缓存,没有 JVM 和 OS 之间的复制操作,提高了传输效率,作用:

  • 可以用于进程间的通信,能达到共享内存页的作用,但在高并发下要对文件内存进行加锁,防止出现读写内容混乱和不一致性,Java 提供了文件锁 FileLock,但在父/子进程中锁定后另一进程会一直等待,效率不高

  • 读写那些太大而不能放进内存中的文件,分段映射

MappedByteBuffer 较之 ByteBuffer 新增的三个方法:

  • final MappedByteBuffer force():缓冲区是 READ_WRITE 模式下,对缓冲区内容的修改强制写入文件

  • final MappedByteBuffer load():将缓冲区的内容载入物理内存,并返回该缓冲区的引用

  • final boolean isLoaded():如果缓冲区的内容在物理内存中,则返回真,否则返回假

public class MappedByteBufferTest {
    public static void main(String[] args) throws Exception {
        // 读写模式
        RandomAccessFile ra = new RandomAccessFile("1.txt", "rw");
        // 获取对应的通道
        FileChannel channel = ra.getChannel();
​
        /**
         * 参数1  FileChannel.MapMode.READ_WRITE 使用的读写模式
         * 参数2  0: 文件映射时的起始位置
         * 参数3  5: 是映射到内存的大小(不是索引位置),即将 1.txt 的多少个字节映射到内存
         * 可以直接修改的范围就是 0-5
         * 实际类型 DirectByteBuffer
         */
        MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
​
        buffer.put(0, (byte) 'H');
        buffer.put(3, (byte) '9');
        buffer.put(5, (byte) 'Y');  //IndexOutOfBoundsException
​
        ra.close();
        System.out.println("修改成功~~");
    }
}

从硬盘上将文件读入内存,要经过文件系统进行数据拷贝,拷贝操作是由文件系统和硬件驱动实现。通过内存映射的方法访问硬盘上的文件,拷贝数据的效率要比 read 和 write 系统调用高:

  • read() 是系统调用,首先将文件从硬盘拷贝到内核空间的一个缓冲区,再将这些数据拷贝到用户空间,实际上进行了两次数据拷贝

  • mmap() 也是系统调用,但没有进行数据拷贝,当缺页中断发生时,直接将文件从硬盘拷贝到共享内存,只进行了一次数据拷贝

注意:mmap 的文件映射,在 Full GC 时才会进行释放,如果需要手动清除内存映射文件,可以反射调用 sun.misc.Cleaner 方法

6.5 通道

基本介绍

通道(Channel):表示 IO 源与目标打开的连接,Channel 类似于传统的流,只不过 Channel 本身不能直接访问数据,Channel 只能与 Buffer 进行交互

  1. NIO 的通道类似于流,但有些区别如下:

    • 通道可以同时进行读写,而流只能读或者只能写

    • 通道可以实现异步读写数据

    • 通道可以从缓冲读数据,也可以写数据到缓冲

  2. BIO 中的 Stream 是单向的,NIO 中的 Channel 是双向的,可以读操作,也可以写操作

  3. Channel 在 NIO 中是一个接口:public interface Channel extends Closeable{}

Channel 实现类:

  • FileChannel:用于读取、写入、映射和操作文件的通道,只能工作在阻塞模式下

    • 通过 FileInputStream 获取的 Channel 只能读

    • 通过 FileOutputStream 获取的 Channel 只能写

    • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定

  • DatagramChannel:通过 UDP 读写网络中的数据通道

  • SocketChannel:通过 TCP 读写网络中的数据

  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel

    提示:ServerSocketChanne 类似 ServerSocket、SocketChannel 类似 Socket

常用API

获取 Channel 方式:

  • 对支持通道的对象调用 getChannel() 方法

  • 通过通道的静态方法 open() 打开并返回指定通道

  • 使用 Files 类的静态方法 newByteChannel() 获取字节通道

Channel 基本操作:读写都是相对于内存来看,也就是缓冲区

方法 说明
public abstract int read(ByteBuffer dst) 从 Channel 中读取数据到 ByteBuffer,从 position 开始储存
public final long read(ByteBuffer[] dsts) 将 Channel 中的数据分散到 ByteBuffer[]
public abstract int write(ByteBuffer src) 将 ByteBuffer 中的数据写入 Channel,从 position 开始写出
public final long write(ByteBuffer[] srcs) 将 ByteBuffer[] 到中的数据聚集到 Channel
public abstract long position() 返回此通道的文件位置
FileChannel position(long newPosition) 设置此通道的文件位置
public abstract long size() 返回此通道的文件的当前大小

SelectableChannel 的操作 API

方法 说明
SocketChannel accept() 如果通道处于非阻塞模式,没有请求连接时此方法将立即返回 NULL,否则将阻塞直到有新的连接或发生 I/O 错误,通过该方法返回的套接字通道将处于阻塞模式
SelectionKey register(Selector sel, int ops) 将通道注册到选择器上,并指定监听事件
SelectionKey register(Selector sel, int ops, Object att) 将通道注册到选择器上,并在当前通道绑定一个附件对象,Object 代表可以是任何类型

文件读写

public class ChannelTest {
    @Test
    public void write() throws Exception{
        // 1、字节输出流通向目标文件
        FileOutputStream fos = new FileOutputStream("data01.txt");
        // 2、得到字节输出流对应的通道  【FileChannel】
        FileChannel channel = fos.getChannel();
        // 3、分配缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("hello,黑马Java程序员!".getBytes());
        // 4、把缓冲区切换成写出模式
        buffer.flip();
        channel.write(buffer);
        channel.close();
        System.out.println("写数据到文件中!");
    }
    @Test
    public void read() throws Exception {
        // 1、定义一个文件字节输入流与源文件接通
        FileInputStream fis = new FileInputStream("data01.txt");
        // 2、需要得到文件字节输入流的文件通道
        FileChannel channel = fis.getChannel();
        // 3、定义一个缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 4、读取数据到缓冲区
        channel.read(buffer);
        buffer.flip();
        // 5、读取出缓冲区中的数据并输出即可
        String rs = new String(buffer.array(),0,buffer.remaining());
        System.out.println(rs);
    }
}

文件复制

Channel 的方法:sendfile 实现零拷贝

  • abstract long transferFrom(ReadableByteChannel src, long position, long count):从给定的可读字节通道将字节传输到该通道的文件中

    • src:源通道

    • position:文件中要进行传输的位置,必须是非负的

    • count:要传输的最大字节数,必须是非负的

  • abstract long transferTo(long position, long count, WritableByteChannel target):将该通道文件的字节传输到给定的可写字节通道。

    • position:传输开始的文件中的位置; 必须是非负的

    • count:要传输的最大字节数; 必须是非负的

    • target:目标通道

文件复制的两种方式:

  1. Buffer

  2. 使用上述两种方法

public class ChannelTest {
    @Test
    public void copy1() throws Exception {
        File srcFile = new File("C:\\壁纸.jpg");
        File destFile = new File("C:\\Users\\壁纸new.jpg");
        // 得到一个字节字节输入流
        FileInputStream fis = new FileInputStream(srcFile);
        // 得到一个字节输出流
        FileOutputStream fos = new FileOutputStream(destFile);
        // 得到的是文件通道
        FileChannel isChannel = fis.getChannel();
        FileChannel osChannel = fos.getChannel();
        // 分配缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while(true){
            // 必须先清空缓冲然后再写入数据到缓冲区
            buffer.clear();
            // 开始读取一次数据
            int flag = isChannel.read(buffer);
            if(flag == -1){
                break;
            }
            // 已经读取了数据 ,把缓冲区的模式切换成可读模式
            buffer.flip();
            // 把数据写出到
            osChannel.write(buffer);
        }
        isChannel.close();
        osChannel.close();
        System.out.println("复制完成!");
    }
    
    @Test
    public void copy02() throws Exception {
        // 1、字节输入管道
        FileInputStream fis = new FileInputStream("data01.txt");
        FileChannel isChannel = fis.getChannel();
        // 2、字节输出流管道
        FileOutputStream fos = new FileOutputStream("data03.txt");
        FileChannel osChannel = fos.getChannel();
        // 3、复制
        osChannel.transferFrom(isChannel,isChannel.position(),isChannel.size());
        isChannel.close();
        osChannel.close();
    }
    
    @Test
    public void copy03() throws Exception {
        // 1、字节输入管道
        FileInputStream fis = new FileInputStream("data01.txt");
        FileChannel isChannel = fis.getChannel();
        // 2、字节输出流管道
        FileOutputStream fos = new FileOutputStream("data04.txt");
        FileChannel osChannel = fos.getChannel();
        // 3、复制
        isChannel.transferTo(isChannel.position() , isChannel.size() , osChannel);
        isChannel.close();
        osChannel.close();
    }
}

分散聚集

分散读取(Scatter ):是指把 Channel 通道的数据读入到多个缓冲区中去

聚集写入(Gathering ):是指将多个 Buffer 中的数据聚集到 Channel

public class ChannelTest {
    @Test
    public void test() throws IOException{
        // 1、字节输入管道
        FileInputStream is = new FileInputStream("data01.txt");
        FileChannel isChannel = is.getChannel();
        // 2、字节输出流管道
        FileOutputStream fos = new FileOutputStream("data02.txt");
        FileChannel osChannel = fos.getChannel();
        // 3、定义多个缓冲区做数据分散
        ByteBuffer buffer1 = ByteBuffer.allocate(4);
        ByteBuffer buffer2 = ByteBuffer.allocate(1024);
        ByteBuffer[] buffers = {buffer1 , buffer2};
        // 4、从通道中读取数据分散到各个缓冲区
        isChannel.read(buffers);
        // 5、从每个缓冲区中查询是否有数据读取到了
        for(ByteBuffer buffer : buffers){
            buffer.flip();// 切换到读数据模式
            System.out.println(new String(buffer.array() , 0 , buffer.remaining()));
        }
        // 6、聚集写入到通道
        osChannel.write(buffers);
        isChannel.close();
        osChannel.close();
        System.out.println("文件复制~~");
    }
}

6.6 选择器

基本介绍

选择器(Selector) 是 SelectableChannle 对象的多路复用器,Selector 可以同时监控多个通道的状况,利用 Selector 可使一个单独的线程管理多个 Channel,Selector 是非阻塞 IO 的核心

  • Selector 能够检测多个注册的通道上是否有事件发生(多个 Channel 以事件的方式可以注册到同一个 Selector),如果有事件发生,就获取事件然后针对每个事件进行相应的处理,就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求

  • 只有在连接/通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程

  • 避免了多线程之间的上下文切换导致的开销

常用API

创建 Selector:Selector selector = Selector.open();

向选择器注册通道:SelectableChannel.register(Selector sel, int ops, Object att)

  • 参数一:选择器,指定当前 Channel 注册到的选择器

  • 参数二:选择器对通道的监听事件,监听的事件类型用四个常量表示

    • 读 : SelectionKey.OP_READ (1)

    • 写 : SelectionKey.OP_WRITE (4)

    • 连接 : SelectionKey.OP_CONNECT (8)

    • 接收 : SelectionKey.OP_ACCEPT (16)

    • 若不止监听一个事件,使用位或操作符连接:int interest = SelectionKey.OP_READ | SelectionKey.OP_WRITE

  • 参数三:可以关联一个附件,可以是任何对象

Selector API

方法 说明
public static Selector open() 打开选择器
public abstract void close() 关闭此选择器
public abstract int select() 阻塞选择一组通道准备好进行 I/O 操作的键
public abstract int select(long timeout) 阻塞等待 timeout 毫秒
public abstract int selectNow() 获取一下,不阻塞,立刻返回
public abstract Selector wakeup() 唤醒正在阻塞的 selector
public abstract Set<SelectionKey> selectedKeys() 返回此选择器的选择键集

SelectionKey API:

方法 说明
public abstract void cancel() 取消该键的通道与其选择器的注册
public abstract SelectableChannel channel() 返回创建此键的通道,该方法在取消键之后仍将返回通道
public final Object attachment() 返回当前 key 关联的附件
public final boolean isAcceptable() 检测此密钥的通道是否已准备好接受新的套接字连接
public final boolean isConnectable() 检测此密钥的通道是否已完成或未完成其套接字连接操作
public final boolean isReadable() 检测此密钥的频道是否可以阅读
public final boolean isWritable() 检测此密钥的通道是否准备好进行写入

基本步骤:

//1.获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2.切换非阻塞模式
ssChannel.configureBlocking(false);
//3.绑定连接
ssChannel.bin(new InetSocketAddress(9999));
//4.获取选择器
Selector selector = Selector.open();
//5.将通道注册到选择器上,并且指定“监听接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

6.7 NIO实现

常用API

  • SelectableChannel_API

    方法 说明
    public final SelectableChannel configureBlocking(boolean block) 设置此通道的阻塞模式
    public final SelectionKey register(Selector sel, int ops) 向给定的选择器注册此通道,并选择关注的的事件
  • SocketChannel_API:

    方法 说明
    public static SocketChannel open() 打开套接字通道
    public static SocketChannel open(SocketAddress remote) 打开套接字通道并连接到远程地址
    public abstract boolean connect(SocketAddress remote) 连接此通道的到远程地址
    public abstract SocketChannel bind(SocketAddress local) 将通道的套接字绑定到本地地址
    public abstract SocketAddress getLocalAddress() 返回套接字绑定的本地套接字地址
    public abstract SocketAddress getRemoteAddress() 返回套接字连接的远程套接字地址
  • ServerSocketChannel_API:

    方法 说明
    public static ServerSocketChannel open() 打开服务器套接字通道
    public final ServerSocketChannel bind(SocketAddress local) 将通道的套接字绑定到本地地址,并配置套接字以监听连接
    public abstract SocketChannel accept() 接受与此通道套接字的连接,通过此方法返回的套接字通道将处于阻塞模式
    • 如果 ServerSocketChannel 处于非阻塞模式,如果没有挂起连接,则此方法将立即返回 null

    • 如果通道处于阻塞模式,如果没有挂起连接将无限期地阻塞,直到有新的连接或发生 I/O 错误

代码实现

服务端 :

  1. 获取通道,当客户端连接服务端时,服务端会通过 ServerSocketChannel.accept 得到 SocketChannel

  2. 切换非阻塞模式

  3. 绑定连接

  4. 获取选择器

  5. 将通道注册到选择器上,并且指定监听接收事件

  6. 轮询式的获取选择器上已经准备就绪的事件

客户端:

  1. 获取通道:SocketChannel sc = SocketChannel.open(new InetSocketAddress(HOST, PORT))

  2. 切换非阻塞模式

  3. 分配指定大小的缓冲区:ByteBuffer buffer = ByteBuffer.allocate(1024)

  4. 发送数据给服务端

37 行代码,如果判断条件改为 !=-1,需要客户端 close 一下

public class Server {
    public static void main(String[] args){
        // 1、获取通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 2、切换为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 3、绑定连接的端口
        serverSocketChannel.bind(new InetSocketAddress(9999));
        // 4、获取选择器Selector
        Selector selector = Selector.open();
        // 5、将通道都注册到选择器上去,并且开始指定监听接收事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 6、使用Selector选择器阻塞等待轮已经就绪好的事件
        while (selector.select() > 0) {
            System.out.println("----开始新一轮的时间处理----");
            // 7、获取选择器中的所有注册的通道中已经就绪好的事件
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectionKeys.iterator();
            // 8、开始遍历这些准备好的事件
            while (it.hasNext()) {
                SelectionKey key = it.next();// 提取当前这个事件
                // 9、判断这个事件具体是什么
                if (key.isAcceptable()) {
                    // 10、直接获取当前接入的客户端通道
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    // 11 、切换成非阻塞模式
                    socketChannel.configureBlocking(false);
                    /*
                     ByteBuffer buffer = ByteBuffer.allocate(16);
                     // 将一个 byteBuffer 作为附件【关联】到 selectionKey 上
                     SelectionKey scKey = sc.register(selector, 0, buffer);
                    */
                    // 12、将本客户端通道注册到选择器
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 13、获取当前选择器上的读就绪事件
                    SelectableChannel channel = key.channel();
                    SocketChannel socketChannel = (SocketChannel) channel;
                    // 14、读取数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    // 获取关联的附件
                    // ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int len;
                    while ((len = socketChannel.read(buffer)) > 0) {
                        buffer.flip();
                        System.out.println(socketChannel.getRemoteAddress() + ":" + new String(buffer.array(), 0, len));
                        buffer.clear();// 清除之前的数据
                    }
                }
                // 删除当前的 selectionKey,防止重复操作
                it.remove();
            }
        }
    }
}
public class Client {
    public static void main(String[] args) throws Exception {
        // 1、获取通道
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
        // 2、切换成非阻塞模式
        socketChannel.configureBlocking(false);
        // 3、分配指定缓冲区大小
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 4、发送数据给服务端
        Scanner sc = new Scanner(System.in);
        while (true){
            System.out.print("请说:");
            String msg = sc.nextLine();
            buffer.put(("Client:" + msg).getBytes());
            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();
        }
    }
}

七、AIO

Java AIO(NIO.2) : AsynchronousI/O,异步非阻塞,采用了 Proactor 模式。服务器实现模式为一个有效请求一个线程,客户端的 I/O 请求都是由 OS 先完成了再通知服务器应用去启动线程进行处理

AIO异步非阻塞,基于NIO的,可以称之为NIO2.0
  BIO                     NIO                                AIO        
Socket                SocketChannel                    AsynchronousSocketChannel
ServerSocket          ServerSocketChannel          AsynchronousServerSocketChannel

当进行读写操作时,调用 API 的 read 或 write 方法,这两种方法均为异步的,完成后会主动调用回调函数:

  • 对于读操作,当有流可读取时,操作系统会将可读的流传入 read 方法的缓冲区

  • 对于写操作,当操作系统将 write 方法传递的流写入完毕时,操作系统主动通知应用程序

在 JDK1.7 中,这部分内容被称作 NIO.2,主要在 Java.nio.channels 包下增加了下面四个异步通道: AsynchronousSocketChannel、AsynchronousServerSocketChannel、AsynchronousFileChannel、AsynchronousDatagramChannel

参考:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java网络编程 的相关文章

  • JNA - EnumProcessModules() 未返回所有 DLL?

    我试图从游戏中读取坐标 当我在通过 OpenProcess 接收的 HANDLE 上使用 ReadProcessMemory 以及我在 CheatEngine 中找到的内存时 效果非常好 例如 如果我知道正在运行的进程中的浮点值是0x5AB
  • Java中RandomAccessFile的并发

    我正在创建一个RandomAccessFile对象通过多个线程写入文件 在 SSD 上 每个线程都尝试在文件中的特定位置写入直接字节缓冲区 并且我确保线程写入的位置不会与另一个线程重叠 file getChannel write buffe
  • 检查发送到网页的请求数

    我正在编写一个 Java 多线程应用程序 它可以访问不同 Web 服务器的数百万个 有时甚至数十亿个 URL 这个想法是检查这些 URL 是否给出有效的 200OK 响应或 404 其他代码 我如何知道我的程序是否不会在他们的服务器上造成高
  • 通过 html tidy 提供渲染 jsp 页面

    我有一个在 Glassfish 上运行的 Java 项目 它会呈现一些难看的 HTML 这是使用各种内部和外部 JSP 库的副作用 我想设置某种渲染后过滤器 通过 HTMLTidy 提供最终的 HTML 这样源代码就很好且整洁 有助于调试
  • Java“空白最终字段可能尚未初始化”方法中抛出异常

    我有一些代码 例如 final int var1 if isSomethingTrue var1 123 else throwErrorMethod int var2 var1 throwErrorMethod 的定义如下 private
  • EL 通过 Scriptlet

    在 JSP 中使用 EL 相对于 scriptlet 的优势是什么 EL 被认为是无脚本语言 EL 使 JSP 免受容易出错原始 Java 代码并强制您根据 MVC 思想编写 JSP EL 或像 JSTL 这样的标签库 不可能实现的任何事情
  • 全静态方法和应用单例模式有什么区别?

    我正在创建一个数据库来存储有关我的网站用户的信息 我正在使用 stuts2 因此使用 Java EE 技术 对于数据库 我将创建一个 DBManager 我应该在这里应用单例模式还是将其所有方法设为静态 我将使用这个 DBManager 进
  • 如何在 Java 中使用 StringUtils?

    我是 Java 初学者 我想用StringUtils replace但 Eclipse 输出 StringUtils cannot be resolved I tried import java lang 但它不起作用 java lang不
  • net.sf.jasperreports.engine.JRRuntimeException:java.io.IOException:无法读取字体数据

    我正在尝试通过 JasperReport 创建 PDF 报告 但读取字体数据时出现问题 我有 jasperreports extension properties 和 ClassPath 中的相关 TTF 文件 这是错误 java io I
  • 为什么下面代码的输出是Thread[main,5,main]

    public class test1 public static void main String args TODO Auto generated method stub Thread t Thread currentThread Sys
  • 为什么我要使用责任链而不是 switch 语句

    考虑一下您已经获得了多次验证 仅当要检查的对象属于某种类型时 这些验证才应生效 为什么我要使用责任链而不是 switch 语句 责任链示例 public class Executor Inject private ValidatorFact
  • 使用 Box2d(适用于 Android)进行碰撞检测?

    有人可以解释一下使用 box2d for android 进行碰撞检测的工作原理吗 我无法理解 BBContactListener 以什么方式工作 BBContactListener listener new BBContactListen
  • JFrame Glasspane 也优于 JDialog,但不应该

    我有一个带有 Glasspane 的 JFrame 未装饰 该框架打开一个 JDialog 也未装饰 也有一个 glassPane 并隐藏自身 setVisible false Glasspanes 通过 setGlassPane 设置 对
  • Java LRU 缓存使用 LinkedList

    堆栈溢出的新手 所以请不要介意我以菜鸟的方式问这个问题 我正在尝试使用链表实现 LRU 缓存 我在这里看到了使用 linkedHashMap 和其他数据结构的其他实现 但对于这种情况 我正在尝试使用链表创建最佳优化版本 正如我在技术期间被问
  • 春季MVC。方法参数字段的默认值

    我有一个带有方法测试的简单控制器 RequestMapping produces application json ResponseBody public HttpEntity
  • Java 中更高级的泛型

    假设我有以下课程 public class FixExpr Expr
  • Java 验证日期为 yyyyMMddHHmmss

    我想在java中验证给定的日期格式为yyyyMMddHHmmss 状况 应符合格式 yyyyMMddHHmmss 它应该验证当前日期 它应该验证与当前小时有 3 小时或 3 小时差异的小时数 如果满足所有三个条件 Java 方法应返回 tr
  • ASTParser:解析绑定后查找声明节点

    我创建了一个启用了绑定的 AST 当我稍后解析绑定时 我得到了一个有效的 ITypeBinding 但是 当我想要获取绑定的声明 Node 时 它 总是返回 null 除非 ITypeBinding 在 sourceFile 中声明 这是我
  • Lucene/Hibernate 搜索锁定异常

    我使用 Hibernate Search 在 Web 应用程序上索引和全文搜索项目 没有问题 来自我的 pom xml
  • 使用正则表达式匹配阿拉伯文文本

    我试图使用正则表达式仅匹配阿拉伯语文本 但出现异常 这是我的代码 txt matches P Arabic 这是例外情况 线程 main 中的异常 java util regex PatternSyntaxException 索引 9 附近

随机推荐

  • node调用谷歌翻译Api,实现自动国际化

    原因 项目国际化过程繁琐 每次都需要人工去google翻译 导致工作效率不高 需求 1 减少人工的重复劳动 提高工作效率 2 使用脚本调用谷歌翻译接口自动化翻译 3 free 作为程序员肯定接受不了付费服务 找方法解决限制 前期准备 1 谷
  • Java笔记16——优先级队列(堆)

    目录 概念 基于二叉树的堆 二叉堆 基于动态数组ArrayList实现的最大堆 向堆中添加元素 siftUp 在堆中取出最大值 最大堆 3 heapify 堆化 在Java中比较两个元素的大小关系 基于堆的优先级队列到底如何实现 有啥应用
  • 0x00007FFEBAD050D8 处(位于 first.exe 中)有未经处理的异常: Microsoft C++ 异常: cv::Exception,位于内存位置 0x0000000DD73CE

    有些时候会出现这种异常 看了网上的一些解释 说有可能是lib文件添置有问题 另一种是路径用成了 我这里给出一种新的可能 那就是图片格式转换错误 这里已经将文件定义为了灰度图像 但是后面使用cvtColor 函数时又将图片转为灰度图像 导致错
  • visio画扇形

    步骤1 首先 选择文件 选项 打开开发工具 步骤2 添加圆形 步骤3 添加两条线 形成扇形的两条边 步骤4 选中圆和线条 在开发工具菜单栏选择操作 修剪 步骤5 选中线条和右边这段弧形 依次选择开发工具 操作 连接 步骤6 拖动该扇形 填充
  • 用代码生成炫酷的词云图—《你好,李焕英》

    最近比较火的电影 你好 李焕英 莫名戳中了大家的泪点 应用评论中的一句 妈妈永远比想象中的要爱我们 虽然我没哭 但看大家都哭了 说明电影不在于多有深意 而是能引起大家共鸣的电影 才是好电影 完全瞎编的 下面我们就来看一下 你好 李焕英 在豆
  • 盛科交换机配置命令_cisco2960交换机 清除配置的命令

    cisco2960交换机 清除配置的命令 1 计算机通过COM端口连接到交机 如果电脑不带COM 需要使用USB转串口线连接交换机 通过超级终端调试 2 当开关通电时 按住开关前面的 模式 按钮 用手按住交换机 Mode 按钮 注意超级终端
  • Uploads-labs靶场练习记录8、9、10-绕过服务端检测(文件后缀)

    这是第八关 1 提示显示不可以上传所有可解析的后缀 那就用png配合htaccess转换php吧 2 成功 这是第九关 1 修改大小写发现还是不能上传 2 后缀加上 DATA上传 成功 这是第十关 1 尝试修改为filename phpin
  • 程序编译链接过程

    文章目录 前言 一 预编译阶段 二 编译阶段 三 汇编阶段 四 链接阶段 总结 前言 include
  • 【msvcp140.dll怎么下载】msvcp140.dll丢失的解决方法

    msvcp140 dll文件对一些电脑软件 电脑游戏等程序的正常运行起到关键性作用 对于弹出缺少此类文件的弹窗 用户们很多时候也摸不着头脑 程序明明上次都能正常运行 突然就弹出缺少msvcp140 dll文件的提醒窗口 通过小编此次编辑的文
  • 多益研发岗视频面试

    03 13 这是自己的第三场面试 自认为面试得还可以 90 以上的问题都答对了 但一个星期后通知说挂了 还是会觉得困惑与可惜 今后再巩固各个知识点吧 39min 1 自我介绍 2 项目介绍 3 项目中有什么亮点或者遇到什么难点 4 说说数据
  • 用HTML做一個酷炫的照片墻,效果非常酷炫。

    下面給大家看看效果 想不想要代碼呢 上代碼
  • 关于字符串二进制定点数浮点数与int以及NBCD码转换的问题

    关于字符串二进制定点数浮点数与int以及NBCD码转换的问题 代码 将十进制数字字符串转化为二进制补码 32位 public String intToBinary String numStr int num Integer parseInt
  • API-应用程序编程接口

    API 定义 通俗理解 定义 API Application Programming Interface 应用程序接口 是一些预先定义的接口 如函数 HTTP接口 或指软件系统不同组成部分衔接的约定 1 用来提供应用程序与开发人员基于某软件
  • 《迅为开发板i.MX8MM 学习记录》——【MIPI篇】Linux 应用程序显示一张图片

    文章目录 前言 一 准备工作 1 开发环境 2 文件准备 二 伪代码分析 1 读取图片数据到数组 2 打开DRM设备 创建fb 3 填充framebuffer 三 完整代码 1 app h 2 app c 四 编译与运行 1 编译 2 运行
  • Blender-绑定动画Auto-Rig Pro 重映射动捕FBX 小K结合 的问题以及解决

    基本使用的教程参考 B站教程参考 Blender 零成本动捕 使用Autorig轻松重映射动捕BVH to FBX to MMD VMD 等自定义骨骼 附带IK 小K的FBX文件导入会报错 需要安装 better fbx addon 4 1
  • STL(标准模板库)泛型编程的基础介绍

    STL泛型编程 一 概述 二 STL基本概念 三 STL六大组件 四 STL中容器 算法 迭代器 五 C 11新增的容器种类 六 序列 七 算法 八 迭代器 1 迭代器的种类 2 随机访问迭代器 3 迭代器层次结构 九 概念 改进和模型 十
  • 关于丢失signal的发生过程

    关于条件变量的使用 有一种需要注意的情况 条件变量的原理是 wait函数将当前线程挂起 加入到等待队列中 wake函数将某个或者所有被wait函数挂起的线程复活 所以 wake执行时一定要保证所有该wait的都已经wait了 也就是说被挂在
  • 几十亿工单表,查询优化案例

    前言 之前在某大型保险公司担任技术经理 负责优化话务系统模块 由于系统已经运行10年之久 尤其在话务系统中 沉积了几十亿的话务信息表 业务人员反馈 话务系统历史数据查询部分已经完全查询不动 且数据增量仍然已每天200w 以上 数据库频繁报警
  • 【日常业务开发】Java调用第三方http接口的常用方式

    日常业务开发 Java调用第三方http接口的常用方式 概述 Java调用第三方http接口的方式 通过JDK网络类Java net HttpURLConnection 通过apache common封装好的HttpClient 通过Apa
  • Java网络编程

    博客说明 内容初稿为本人的学习笔记归纳整理 在此基础上加入了相关视频学习 相关书籍的理解 相关文章博客查阅及源码阅读 博客的编写已经尽量做到详尽 但免不了有纰漏和理解不到位的地方 发现博客的任何问题均可联系我 aboutwxf 163 co