Java基础系列:NIO编程

2023-11-05

俗世游子:专注技术研究的程序猿

说在前面的话

聊完了Socket编程,我们来聊一聊关于NIO方面的话题,当然在这里只会介绍用的比较广泛的类/方法,其他用的不多的,就不多介绍了,用到的时候查API就好了

本节我们聊个大概内容,明白该如何使用,等到Netty部分的时候还会重点聊漏掉的部分,比如:

  • 四种网络模型
  • Reactor反应器模式
  • 。。。

Channel和Buffer

前面我们聊到的Socket编程,建立连接之后本质上还是在操作IO流,这种方式属于同步阻塞模型,也就是我们常说的BIO

这种方式的缺点可想而知,所以为了提高效率,在JDK1.4之后,出现了NIO

  • 缓冲区为载体,通过建立Channel来进行数据传输

NIO是非阻塞模型,所谓的非阻塞模型,就是说:

  • 当我们调用read方法的时候,如果此时有数据,则读取到数据并返回
  • 如果没有数据,那么就直接返回,不会阻塞到主线程

而且所有的操作都是基于事件监听的方法

首先我们先聊缓冲区:

Buffer

根据官方介绍,Buffer是一种特定类型的容器,用来存放需要传输的数据,该类属于抽象类,我们可以查看一下它的具体实现类:

Buffer的实现类

从名字上应该能看出它的意思,这里就不多介绍了

后续的例子都是用ByteBuffer为例

重要属性

我要说的是它的特性,其中包含3个非常重要的属性:

  • capacity

该属性表示当前Buffer的容量,当前容量在初始化之后是固定的,不能被修改。

// 创建过程
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 直接指定内容的创建
ByteBuffer byteBuffer = ByteBuffer.wrap("123".getBytes());
// 分配直接内存缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);

三者区别:

  • allocate()只是开辟了一个数组空间
  • wrap()表示直接将数组对象包装到缓冲区中,这样创建下来的对象,limitcapacity都是数组长度,position为0,所以在读取数据的时候不需要调用flip()方法
  • allocateDirect()allocate()的表现方式一样,但是底层实现不同。allocateDirect()是从直接内存中开辟一块空间做缓冲区。

直接内存归属于OS管理的内核空间的内存,可以做到零拷贝

通过查看源码一步步到Buffer中,一定可以看到

// cap就是我们初始化的容量
this.capacity = cap;
  • limit

limit表示限制,也就是说在读取或者写入元素的时候不允许的索引限制,默认情况下,初始化之后,其限制值不会超过容量

// 1024
System.out.println("limit:" + byteBuffer.limit());
  • position

表示读取或者写入后的索引位置,不能超过限制值,否则会抛出BufferOverflowException

byteBuffer.put("11".getBytes());
// 2 插入了2位的数据
System.out.println("position:" + byteBuffer.position());
读写模式

Buffer使用最麻烦的地方在这里,前面说过,Channel是通过Buffer来传输数据的,那么我们可以认定Buffer为中心点

  • 当向Bufferput()的时候,称为写模式,而且默认情况下就是写模式,可以通过clear()来进行模式转换
byteBuffer.clear();
  • 当从Bufferget()的时候,称为读模式,我们通过flip()来转换
byteBuffer.flip();

下面我用一个实验来查看一下:

public static void main(String[] args) {
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    byteBuffer.put("123456".getBytes());

    print("原始", byteBuffer);
    byteBuffer.flip();
    print("flip后", byteBuffer);
    byteBuffer.clear();
    print("clear后", byteBuffer);

}

private static void print(String tag, ByteBuffer byteBuffer) {
    System.out.println(tag + ":position:" + byteBuffer.position());
    System.out.println(tag + ":limit:" + byteBuffer.limit());
    System.out.println("==============================");
}

输出结果

原始:position:6
原始:limit:1024
==============================
flip后:position:0
flip后:limit:6
==============================
clear后:position:0
clear后:limit:1024
==============================
重要结论

通过分析上面的输出,我们可以得出以下结论:

  • 默认情况下,Buffer是写模式,而且postion表示写入数据的位置,limit表示最大限制

  • 当我们调用flip()转换模式之后,position置为0,表示从第0位读取,limit置为原始position,说明我们在读取的时候,不能超过上次写入的长度,而且如果我们在这里put()的话,也会覆盖掉之前的数据

  • 当调用clear()之后,再次恢复成写模式,虽然position还是0,但是之前写入的数据并不会消失,只有在重新put()之后才会覆盖

  • 所以,clear()flip()方法修改的只是positionlimit,不会修改实际存储数组,这里大家不要混淆

还是推荐大家亲自试一试,每一个细节点大家都看看那两个属性的变化

方法整理

上面其实也了解到不少方法,下面还有一些方法我们用到的也比较频繁:

array()
byteBuffer.flip();
final byte[] array = byteBuffer.array();
System.out.println(new String(array, 0, byteBuffer.limit()));
byteBuffer.clear();

该方法会直接返回最大容量的数组,所以我们在读取的时候,最好能转换成读模式

putXXX()

ByteBuffer除了可以写入字节数组外,还可以写入其他类型的数据,包括:

image-20210305112923066

get也是同理,这里就不写了

Channel

Buffer配合使用的通道,可以理解为一个通道就是一个连接。根据不同的传输协议,有不同的Channel实现:

FileChannel

这是专门用来操作文件的通道,这里要注意一点,FileChannel阻塞模式的通道

开启通道
// 第一种启动方式
Path path = Paths.get("文件路径");
final FileChannel fileChannel = FileChannel.open(path);

// 通过流来启动
final FileChannel fileChannel = new FileInputStream("文件路径").getChannel();
force()

在将缓冲区的内容写入到通道的过程,是由操作系统来完成的,处于性能方面考虑,不可能是实时写入,所以为了保证数据最终都真正写入到磁盘上,需要强制刷新

fileChannel.force(true);

其他的和之前的IO没有什么区别,我这里就不写了,给大家写个例子就明白了

基于NIO的文件传输
public class CopyFile {
    public static void main(String[] args) throws IOException {
        // 得到文件
        final Path source = Paths.get("D:\\Working Directory\\project\\study\\study-java\\src\\main\\java\\zopx\\top\\study\\jav\\_nio\\CopyFile.java");

        // 读取通道: 通过open打开
        final FileChannel sourceChannel = FileChannel.open(source);
        // 写出通道:  通过流打开
        final FileChannel targetChannel = new FileOutputStream(new File("D:\\Working Directory\\project\\study\\study-java\\src\\main\\java\\zopx\\top\\study\\jav\\_nio\\_CopyFile.txt")).getChannel();

        // --------------------
        // 定义ByteBuffer
        ByteBuffer bytebuffer = ByteBuffer.allocate(1024);
        // read其实是向bytebuffer中写入数据,所以这里是默认的,不需要转换
        while ((sourceChannel.read(bytebuffer)) != -1) {
            // write其实是从 bytebuffer 中读取数据然后再通过channel发送出去,所以这里需要转换成读模式
            bytebuffer.flip();
            // 将bytebuffer中的数据写出到通道中
            targetChannel.write(bytebuffer);
            // 将数据写出去之后,最好将bytebuffer的模式改成写模式,不然写入数据会出错
            bytebuffer.clear();
        }
        
        // 强制刷新
        targetChannel.force(true);

        // 关闭通道
        sourceChannel.close();
        targetChannel.close();
    }
}

这里还有一种不需要考虑ByteBuffer模式转换的方式

long size = sourceChannel.size();
long pos = 0, count = 0;
while (pos < size) {
    count = size - count < 1024 ? size - count : 1024;
    pos += targetChannel.transferFrom(sourceChannel, pos, count);
}
TCP/IP协议下的Channel

两者成对出现,不想单独写了

基于网络传输的通道,分别表示为:

  • ServerSocketChannel
  • SocketChannel

和之前的ServerSocket/Socket意思是一样的

服务端开启通道
// 开启通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 绑定端口
ssc.bind(new InetSocketAddress(8888));
客户端开启通道
// 连接到服务端
SocketChannel sc = 
    SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));

// 也可以通过调用connect()方法来实现连接到服务端
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888))

下面两边的方法都是一样的,就以服务端为例

主要配置项
ssc.configureBlocking(false);

该配置表示设置通道的阻塞模式,如果:

  • 设置为true,表示为阻塞模型,后续的read/write方法都是阻塞的,就和面向流的操作效率一样
  • 设置为false,表示为非阻塞模式
注册选择器
Selector selector = Selector.open();
// 注册接收事件
ssc.register(selector, SelectionKey.OP_ACCEPT);

如果说一个通道代表一个连接,那么在非阻塞模式下,通过选择器可以监控多个连接的IO状况,可以说选择器和通道是监控被监控的关系

通道IO事件

表示通道具备完成某种IO操作的条件,包含以下几种事件:

  • SelectionKey.OP_ACCEPT

就绪可接收

  • SelectionKey.OP_CONNECT

可连接

  • SelectionKey.OP_READ

可读

  • SelectionKey.OP_WRITE

可写

轮询感兴趣的IO事件
  • 就绪状态的查询:通过选择器的select(),查询注册过的所有socket的就绪状态,当有任何一个注册过的socket中的数据就绪,那么就将其添加到就绪的列表中
  • 然后我们通过循环处理
while (selector.select() > 0) {
    final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    while (iterator.hasNext()) {
        final SelectionKey selectionKey = iterator.next();
        // 防止重复处理
        iterator.remove();

        if (selectionKey.isAcceptable()) {
            // 接收
        } 
        if (selectionKey.isConnectable()) {
            // 连接
        }
        if (selectionKey.isReadable()) {
            // 可读
        }
        if (selectionKey.isWritable()) {
            // 可写
        }
    }
}       

基本操作就是这样,接下里就是通过read()write()操作Buffer,还是已一个实际案例来完成

一定要注意Buffer模式之间的转换

基于NIO的点对点聊天
  • 服务端
public class ChatServer {

    /**
     * 绑定端口号
     */
    public static int PORT = 8888;

    private static Map<String, SocketChannel> TOKEN_SOCKET_MAP = new HashMap<>();
    private static Map<SocketChannel, String> SOCKET_TOKEN_MAP = new HashMap<>();


    public static void main(String[] args) {
        try {
            new ChatServer().start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    ServerSocketChannel ssc = null;

    private void start() throws IOException {
        // 开启NIO的ServerSocket通道
        ssc = ServerSocketChannel.open();
        // 绑定端口
        ssc.bind(new InetSocketAddress(PORT));

        //设置为非阻塞式,这里是NIO的关键点
        ssc.configureBlocking(false);

        // 选择器
        Selector selector = Selector.open();
        // 注册接收事件
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        // 轮询注册事件
        while (selector.select() > 0) {
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 移除掉,避免重复处理
                iterator.remove();

                if (selectionKey.isAcceptable()) {
                    // 获取连接过来的客户端
                    SocketChannel accept = ssc.accept();

                    if (null == accept)
                        continue;

                    System.out.println(accept);

                    // 维护Token
                    String token = UUID.randomUUID().toString();
                    TOKEN_SOCKET_MAP.put(token, accept);
                    SOCKET_TOKEN_MAP.put(accept, token);

                    // 设置客户端为非阻塞式并注册可写事件
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_WRITE);
                }
                // 如果是读取事件
                if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();

                    ByteBuffer buffer = ByteBuffer.allocate(1024);

                    // 读取数据
                    StringBuilder sb = new StringBuilder();
                    while ((channel.read(buffer)) > 0) {
                        buffer.flip();
                        String msg = new String(buffer.array(), 0, buffer.limit());
                        sb.append(msg);
                        buffer.clear();
                    }

                    if (sb.length() > 0) {
                        System.out.println("-----读取数据:" + sb.toString());

                        writeMsg(channel, sb.toString());
                    }
                }
                // 如果是写出事件
                if (selectionKey.isWritable()) {

                    // 将生成的token返回给客户端
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    String content = getTokenBySocket(channel) + ":" + getTokenBySocket(channel);

                    if (null != channel) {
                        send(channel, content);
                        channel.register(selector, SelectionKey.OP_READ);
                    }
                }
            }
        }
    }

    /**
     * 处理数据并发送
     * @param channel 当前channel
     * @param msg 发送内容
     * @throws IOException IOException
     */
    private void writeMsg(SocketChannel channel, String msg) throws IOException {
        if (null == msg || "".equals(msg)) return;

        String[] split = msg.split(":");
        SocketChannel targetChannel = getSocketByToken(split[0]);
        String content = getTokenBySocket(channel) + ":" + split[1];

        send(targetChannel, content);
    }

    /**
     * 通过token找到SocketChannel
     *
     * @param token token
     * @return SocketChannel
     */
    public SocketChannel getSocketByToken(String token) {
        return TOKEN_SOCKET_MAP.get(token);
    }

    /**
     * 通过通道找到Token
     *
     * @param channel 通道
     * @return String
     */
    public String getTokenBySocket(SocketChannel channel) {
        return SOCKET_TOKEN_MAP.get(channel);
    }

    /**
     * 发送消息
     * 发送者token: 发送内容
     *
     * @param channel 通道
     * @param msg     消息
     * @throws IOException IO异常
     */
    public void send(SocketChannel channel, String msg) throws IOException {
        System.out.println("----发送的消息#" + msg);
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
        channel.write(buffer);
    }
}

  • 客户端
public class ChatClient {

    private static Charset charset = StandardCharsets.UTF_8;

    public static void main(String[] args) throws IOException {
        // 开启SocketChannel
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", ChatServer.PORT));
        // 配置非阻塞
        socketChannel.configureBlocking(false);

        new Thread(() -> {
            try {
                Selector selector = Selector.open();
                socketChannel.register(selector, SelectionKey.OP_READ);

                while (selector.select() > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();

                        if (selectionKey.isReadable()) {
                            SocketChannel channel = (SocketChannel) selectionKey.channel();

                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            StringBuilder sb = new StringBuilder();
                            while ((channel.read(byteBuffer)) > 0) {
                                byteBuffer.flip();
                                String msg = new String(byteBuffer.array(), 0, byteBuffer.limit());
                                sb.append(msg);
                                byteBuffer.clear();
                            }
                            if (!"".equals(sb.toString())) {
                                System.out.println(sb.toString());
                            }
                            // 设置可读事件
                            selectionKey.interestOps(SelectionKey.OP_READ);
                        }
                    }
                }
            } catch (Exception e) {

            }
        }).start();

        Scanner scanner = new Scanner(System.in);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (scanner.hasNext()) {
            String next = scanner.next();
            System.out.println("----想要发送的数据:" + next);
            buffer.put(next.getBytes());
            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();
        }
    }
}

代码实际测过,按照token:消息体的方式发送消息

demo代码,很多地方没有优化到,如果感兴趣可以自己尝试在这个基础上更新

最后的话

到此就全部结束啦,这里没有介绍DatagramChannel,该类属于UDP协议

同样的,很多其它API方法也没有聊到,还是给出文档,用到的时候现查就可以了:

NIO Buffer

NIO Channel

NIO File

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java基础系列:NIO编程 的相关文章

随机推荐

  • mongo: error while loading shared libraries: libcrypto.so.10: cannot open shared object file: No suc

    问题 mongo error while loading shared libraries libcrypto so 10 cannot open shared object file No such file or directory 原
  • 使用jQuery为表单设置值

    jQuery是一个流行的JavaScript库 它简化了JavaScript的编写和操作 在编程中 我们经常需要为表单元素设置值 jQuery提供了简单而强大的方法来实现这个目标 在本文中 我将向您展示如何使用jQuery为表单设置值 并提
  • 通过群晖 DSM 的 “装载远程文件夹(CIFS)” 功能来装载局域网中的 Win7 主机的指定文件或盘符...

    今天下午有空 就整理下电脑和 DSM 里的文件和资料 于是想把电脑里的几个盘 映射 到 DSM 的 File Station 中 以方便数据的搬移 这个 映射 就是通过 DSM 里 File Station 的 装载远程文件夹 来实现的 早
  • 性能测试连载 (9)-压测实战分析性能拐点

    咨询微信 uhz2008 概述 本文对百度进行一次实战压测 验证一下理论知识 分析一下性能拐点 操作 第一次实验 200并发 并发200 不限迭代次数 同时在请求下面加RPS定时器 目的是在200线程下 将RPS逐步增加到1000 S 并持
  • Linux磁盘挂载

    概念 挂载 所谓挂载就是利用一个目录当成进入点 将磁盘分区的数据放在该目录下 也就是说 进入该目录就可以读取该分区的意思 Linux系统最重要的是根目录 因此根目录一定需要挂载到某个分区 至于其他的目录则根据需要挂载到不同的分区 假设硬盘分
  • 推荐一款我最近爱上的网页版文库(编辑器)——语雀yuque.com

    推荐一款由于工作接触然后热爱上的文库 语雀 语雀 https www yuque com 一开始它是阿里内部的文档库 后来随着它的功能升级 升级 升级 我恨不得把自己以前散落在各地的笔记 各种文件都搬上去 甚至最近已经弃用axureRP和s
  • 微信二次分享链接,出现config:invalid signature错误的解决方法

    当开发微信时需要做特定的页面做分享时 根据官方提供的jssdk php文件创建的签名数据包调试时 大家碰到的最多的错误而且解决最麻烦的大概就是signature错误了 分享时提示错误 errMsg config invalid signat
  • 单链表的各种操作,对于初学者来说,更容易理解

    include
  • Oracle11g 数据库提示ORA-28002: the password will expire within

    Oracle11g 数据库提示ORA 28002 the password will expire within 7 days 是说密码过期 oracle11g中默认在default概要文件中设置了 PASSWORD LIFE TIME 1
  • 批量修改mathtype中的公式字体、大小

    目录 一 保存格式文件 二 批量修改 一 保存格式文件 随便双击打开一个公式 点击菜单栏大小 定义 然后根据自己的需求修改参数 修改完成后点击确定 点击菜单栏预设 公式预设 保存到文件 选择一个保存的位置并命名 二 批量修改 下面就对全文公
  • 怎么导入别人的项目运行部署搭建Java项目springboot代码eclipse idea的心得实操

    调过无数代码 就发现没有运行不了的代码 这些代码也是我实践经验的来源 从最开始的servlet 到现在的springcloud 每一种类型的项目都有其特定的导入部署方式 如果是idea或者eclipse 按如下操作 按项目类型分两种情况部署
  • 内网穿透的应用-如何搭建WordPress博客网站,并且发布至公网上?

    文章目录 如何搭建WordPress博客网站 并且发布至公网上 概述 前置准备 1 安装数据库管理工具 1 1 安装图形图数据库管理工具 SQL Front 2 创建一个新数据库 2 1 创建数据库 2 2 为数据库创建一个用户 3 安装P
  • 如何优雅的使用fbx sdk

    include 头文件 设置lib目录 添加libfbxsdk lib 复制粘贴dll文件 运行平台一定要 大写加粗 一定要 设置为x64 添加宏FBXSDK SHARED 否则会出各种莫名其妙的问题
  • 【java基础】HashMap源码解析

    文章目录 基础说明 构造器 put方法 无扩容 无冲突 put方法 无冲突 有扩容 put方法 有冲突 无树化 put方法 有冲突 树化 remove方法 树退化 常见方法 总结 基础说明 HashMap 是一个散列表 它存储的内容是键值对
  • Go语言学习15-基本流程控制

    基本流程控制 流程控制对比 Go 和 C 基本流程控制 1 代码块和作用域 2 if 语句 3 switch语句 3 1 表达式switch语句 3 2 类型switch语句 4 for 语句 4 1 for 子句 4 2 range 子句
  • 什么是三极管的倒置状态及其作用!

    1 什么是三极管的倒置状态 集电结正偏 发射结反偏 为倒置状态 集电结正偏 发射结正偏 为饱和状态 集电结反偏 发射结反偏 为倒截止态 集电结反偏 发射结正偏 为放大状态 2 对三极管倒置状态的分析 实际上 当NPN型三极管的三个电极电位关
  • BUUCTF-Reverse:SimpleRev(算法分析题)

    题目地址 https buuoj cn challenges SimpleRev 查壳 得知消息 ELF 64 直接拖进ida64分析 int cdecl noreturn main int argc const char argv con
  • h5和APP实现交互(安卓-ios)

    h5调用APP 的方法 taskCallback是和APP端协议的函数名 安卓 window PlatformCurrency taskCallback 测试数据 ios window webkit messageHandlers task
  • 使用ffmpeg合并多个视频文件

    由于腾讯视频将一个视频分割成多个20M左右的小文件 所以必须合并起来成为一个完整视频文件 用什么工具来合并这些文件呢 想到了已经安装好的ffmpeg 开源免费 又是现成的 两种方法 方法1 直接写文件名 使用 来分割 ffmpeg i co
  • Java基础系列:NIO编程

    俗世游子 专注技术研究的程序猿 说在前面的话 聊完了Socket编程 我们来聊一聊关于NIO方面的话题 当然在这里只会介绍用的比较广泛的类 方法 其他用的不多的 就不多介绍了 用到的时候查API就好了 本节我们聊个大概内容 明白该如何使用