Netty 入门实战

2023-10-31

Netty 入门实战

异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

  • Netty 项目旨在为可维护的高性能和高可伸缩性协议服务器和客户端的快速开发提供一个异步事件驱动的网络应用框架和工具。
  • Netty 是一个 NIO 客户机服务器框架,可以快速简单地开发网络应用程序,如协议服务器和客户机。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器的开发。
  • “快速和简单”并不意味着产生的应用程序会受到可维护性或性能问题的影响。Netty 是根据实现许多协议(如 FTP、 SMTP、 HTTP 以及各种二进制和基于文本的遗留协议)的经验而精心设计的。因此,Netty 成功地找到了一种方法来实现简单的开发、性能、稳定性和灵活性。
  • 一些用户可能已经发现了其他声称具有同样优势的网络应用程序框架,您可能想要问是什么使 Netty 与他们如此不同。答案就是它所建立的哲学。Netty 的目的是从第一天开始就在 API 和实现方面为您提供最舒适的体验。它不是什么实实在在的东西,但是当你阅读本指南和玩 Netty 的时候,你会意识到这种哲学会让你的生活变得更加轻松。

依赖

dependencies {
    implementation "io.netty:netty-all:4.1.56.Final"
}

实战

世界上最简单的协议实现不是发送Hello World消息,被服务器接受到返回相应的响应结果。而是服务器接收到消息后直接丢弃,不做任何响应。

丢弃服务器

要实现 DISCARD 协议,您需要做的唯一一件事就是忽略所有接收到的数据。让我们直接从处理程序实现开始,它处理 Netty 生成的 I/O 事件。

// [1]
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
   
    // [2]
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // super.channelRead(ctx, msg);
        // 不处理消息,直接释放
        // [3]
        ((ByteBuf) msg).release();
    }

    // [4]
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelInboundHandlerAdapter 实现了接口 ChannelInboundHandler。充当适配器的角色提供了各种可以重写的事件处理程序方法,通过适配器的标准实现方式,可以避免我们自己实现处理程序接口。
  2. 我们可以覆盖channelRead()的事件处理器方法。只要从客户机接收到新数据,就会使用接收到的消息调用此方法。
  3. 为了实现DISCARD 协议,处理程序必须忽略接收到的消息。ByteBuf是一个引用计数的对象,必须通过release()方法来进行释放。通常的事项方式是这样的
// [2]
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // super.channelRead(ctx, msg);
        // 不处理消息,直接释放
        // [3]
        //((ByteBuf) msg).release();
        try {
            // 针对消息 msg 进行处理
        } finally {
        	// 释放引用
            ReferenceCountUtil.release(msg);
        }
    }
  1. exceptionCaught()作为异常处理,当 Netty 由于I/O 错误或处理程序实现由于处理事件时抛出的异常而引发异常时,使用 Throwable 调用事件处理程序方法。在大多数情况下,被捕获的异常应该被记录,其相关的通道应该在这里关闭,尽管这个方法的实现可以根据您想要处理的异常情况而有所不同。例如,您可能希望在关闭连接之前发送带有错误代码的响应消息。

启动服务器

public class DiscardServer {
    /**
     * 端口
     */
    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        // [1]
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // [2]
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                	// [3]
                    .channel(NioServerSocketChannel.class)
                	// [4]
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                	// [5]
                    .option(ChannelOption.SO_BACKLOG, 128)
                	// [6]
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
			// [7]
            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new DiscardServer(port).run();
    }
}

  1. NioEventLoopGroup 是一个处理I/O操作的多线程事件循环的处理器定义。例子中定义了2个处理器:

    • 第一个通常被称为“ boss” ,接受一个传入的连接。

    • 第二个通常被称为“工人” ,一旦老板接受了连接并注册了与工人接受的连接,就处理接受连接的通信。

  2. ServerBootstrap 是服务器构造的辅助类,一般不推荐此方式进行服务器的创建。

  3. 此处指定NioServerSocketChannel类,用于实例化一个新的Channel来接受传入的连接。

  4. 此处指定的处理程序将始终由新接受的ChannelChannelInitializer作为特殊的处理程序,用于帮助用户配置新的Channel。往往适用于为新的Channel添加一些处理程序来实现更为复杂的应用程序。

  5. option参数设置,支持设置特定的套接字选项。来满足特定的协议需求,如.option(ChannelOption.TCP_NODELAY, true) 来编写TCP/IP 服务协议。

  6. childOptionoption不同之处在于:

    • option 适用于NioServerSocketChannel来接受传入的连接。
    • childOption适用于被父级的ServerChannel接受的Channels
  7. 绑定到指定端口。

模拟通信

  • 调整代码,打印接受的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        try {
            // 针对消息 msg 进行处理
            while (in.isReadable()) { // [4]
                System.out.print((char) in.readByte());
                System.out.flush();
            }        
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
  • windows 环境下使用 powershell 输入命令 telnet localhost 8080,进行通信。
  • powershell 终端输入的字符会同步在控制台打印出来。

响应服务器

目前为止,我们只接受但是没有任何响应。一台服务器,通常应该响应该请求。让我们学习如何通过实现ECHO协议向客户端写入响应消息,其中任何接收到的数据都被发送回来。

与前面部分实现的丢弃服务器的唯一区别在于它将接收到的数据发回,而不是将接收的数据输出到控制台。因此,再次修改channelRead()方法是足够的:

参考地址:https://netty.io/4.1/xref/io/netty/example/echo/package-summary.html

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

public class EchoServer {
    /**
     * 端口
     */
    private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new EchoServer(port).run();
    }
}

通过终端输入telnet localhost 8080后输入英文字符会得到响应,原字符返回。如依次输入abc,终端打印结果:

aabbcc
  • ChannelHandlerConetxt 提供了很多方法让你去触发 IO 事件或操作。这里我们调用 write(object)来逐字的写入接受到的消息。注意,我们不像 DISCARD 例子里的那样,我们没有释放我们收到的消息。这是因为当它被写回到 wire 时,Netty 替我们释放它。
  • ctx.write(Object) 不会让消息发送,它存在于内部缓冲区,通过调用 ctx.flush() 来把消息发送出去,或者,您可以简洁的调用 ctx.writeAndFlush(msg)。

时间服务器

接下来要实现的协议是 TIME 协议。它不同于前面的示例,因为它发送包含32位整数的消息,而不接收任何请求,并在消息发送后关闭连接。在本例中,您将学习如何构造和发送消息,以及如何在完成时关闭连接。

因为我们将忽略任何接收到的数据,但是一旦建立连接就发送消息,所以这次不能使用 channelRead() 方法。相反,我们应该重写 channelActive()方法。代码如下:

服务端

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // [1]
        final ByteBuf time = ctx.alloc().buffer(4); // [2]
        time.writeInt(89); //ASCII 10进制,对应 Y
        time.writeInt(105); //ASCII 10进制,对应 i
        final ChannelFuture f = ctx.writeAndFlush(time); // [3]
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // [4]
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 如前所述,当建立连接并准备生成通信量时,将调用 channelActive ()方法。让我们编写一个32位的整数,它表示此方法中的当前时间。

  2. 要发送一个新消息,我们需要分配一个新的缓冲区,其中将包含消息。我们要写一个32位的整数,因此我们需要一个容量至少为4字节的 ByteBuf。通过 ChannelHandlerContext.alloc ()获取当前 ByteBufAllocator 并分配一个新缓冲区。

  3. 像往常一样,我们写入一条构造好的消息。但是,等等,哪里冒险了?我们以前不是叫 java.nio 吗。在 NIO 中发送消息之前使用 ByteBuffer.flip () ?ByteBuf 没有这样的方法,因为它有两个指针: 一个用于读操作,另一个用于写操作。当您将某些内容写入 ByteBuf 而读取器索引不变时,写入器索引会增加。读者索引和写者索引分别表示消息的开始和结束位置。

    • 相比之下,NIO 缓冲区并不提供一种清晰的方法来确定消息内容的开始和结束位置而不调用 flip 方法。当您忘记翻转缓冲区时,您将遇到麻烦,因为不会发送任何内容或错误的数据。这种错误在 Netty 不会发生,因为我们对不同的操作类型有不同的指针。当你习惯了它,你会发现它会让你的生活变得更加轻松。

    • 另一点需要注意的是 ChannelHandlerContext.write () 和 writeAndFlush () 方法返回 ChannelFuture。ChannelFuture 表示尚未发生的 I/O操作。这意味着,任何请求的操作可能尚未执行,因为所有操作在 Netty 都是异步的。例如,下面的代码可能会在发送消息之前关闭连接:

      Channel ch = ...;
      ch.writeAndFlush(message);
      ch.close();
      
    • 因此,您需要在 ChannelFuture 完成之后调用 close ()方法,该方法由 write ()方法返回,并在完成写操作后通知其侦听器。请注意,close () 也可能不会立即关闭连接,而是返回一个 ChannelFuture。

  4. 那么,当写请求完成时,我们如何得到通知呢?这很简单,可以添加一个ChannelFutureListener来监听返回的结果ChannelFuture。在这里,我们创建了一个新的匿名通道 ChannelFutureListener,当操作完成时它会关闭通道。

  • 或者,您可以使用预定义的侦听器简化代码:
f.addListener(ChannelFutureListener.CLOSE);
  • 要测试我们的时间服务器是否正常工作,可以使用 telnet localhost 8080 命令,终端在连接上后,打印消息后直接失去连接:
Yi

遗失对主机的连接。

客户端

与 DISCARD 和 ECHO 服务器不同,我们需要 TIME 协议的客户端,因为人不能将32位二进制数据转换为日历上的日期。在本节中,我们将讨论如何确保服务器正常工作,并学习如何使用 Netty 编写客户机。

  • 调整服务端接受请求并返回时间戳
 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // [1]
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        // 2208988800为1900年1月1日00:00:00~1970年1月1日00:00:00的总秒数
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
  • 客户端接收服务端的响应并转换为时间格式输出
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

public class TimeServer {
    /**
     * 端口
     */
    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 在 Netty,服务器和客户机之间最大也是唯一的区别是使用了不同的 Bootstrap 和 Channel 实现。请看下面的代码:
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new TimeServer(port).run();
    }
}

客户端接收到响应打印结果:

Tue Dec 29 12:01:58 CST 2020

流数据传输

在基于流的传输(如 TCP/IP)中,接收到的数据被存储到套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是一个包队列,而是一个字节队列。这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将其视为两条消息,而只是将其视为一堆字节。因此,不能保证您所读到的内容与远程对等方所写的内容完全一致。

例如,假设一个操作系统的 TCP/IP 协议栈已经接收了三个数据包:

1

由于基于流的协议的这个一般属性,在应用程序中很有可能以下面的碎片形式读取它们:

2

因此,接收部分,无论是服务器端还是客户端,都应该将接收到的数据碎片整理成一个或多个有意义的帧,应用程序逻辑可以很容易地理解这些帧。在上面的例子中,接收到的数据应该如下所示:

3

第一个解决方案

现在让我们回到 TIME 客户端示例。我们这里也有同样的问题。32位整数是一个非常小的数据量,它不太可能经常被分段。然而,问题在于它可能是支离破碎的,并且随着流量的增加,支离破碎的可能性也会增加。

最简单的解决方案是创建一个内部累积缓冲区,并等待所有4个字节都被接收到内部缓冲区。以下是修改后的 TimeClientHandler 实现,它解决了这个问题:

public class TimeClientWithBufferHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf buff;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buff = ctx.alloc().buffer(4);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buff.release();
        buff = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg; // (1)
        buff.writeBytes(m);
        m.release();
        if (buff.readableBytes() >= 4) {
            long currentTimeMillis = (buff.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

  • 有两种生命周期监听方法:handlerAdded() and 及handlerRemoved()
  • 您可以执行任意初始化任务,只要它不长时间阻塞;
  • 首先,所有接收到的数据应该累积成buff
  • 然后,处理程序必须检查buff有足够的数据,在这个例子中是4个字节,然后继续进行实际的业务逻辑,当更多的数据到达时,这个函数会重新调用一个方法,最终所有的4个字节都会被累积;

4

  • 非 4 字节的数据会直接被丢弃掉。

第二种解决方案

尽管第一个解决方案已经解决了 TIME 客户机的问题,但是修改后的处理程序看起来并不那么干净。想象一个更复杂的协议,它由多个字段组成,比如一个可变长度的字段。您的 ChannelInboundHandler 实现将很快变得不可维护。

正如您可能已经注意到的,您可以向 ChannelPipeline 添加多个 ChannelHandler,因此,您可以将一个单片 ChannelHandler 分割为多个模块化的 ChannelHandler,以降低应用程序的复杂性。例如,你可以将 TimeClientHandler 分成两个处理器:

  • TimeDecoder 处理碎片化问题
  • 最初的简单版本 TimeClientHandler

幸运的是,Netty 提供了一个可扩展的类,可以帮助你写出第一个开箱即用的类:

public class TimeDecoder extends ByteToMessageDecoder { // (1)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}
public class TimeClientWithDecoder {

    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 8080;
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); 
            b.group(workerGroup);
            b.channel(NioSocketChannel.class); 
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            .addLast(new TimeDecoder()) // (5)
                            .addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  • ByteToMessageDecoder 使得处理分裂问题变得容易;
  • 每当接收到新数据时,ByteToMessageDecoder利用内部维护的累积缓冲区,调用decode方法来处理新数据;
  • 当累积缓冲区中没有足够的数据时ByteToMessageDecoder什么都不会添加到out缓冲区中。当收到更多的数据时会再次调用decode()
  • 如果decode()将一个数据添加到out, 这意味着解码器成功解码了一条信息,将丢弃累积缓冲区的读取部分。请记住,您不需要解码多个消息,ByteToMessageDecoder将继续调用方法,直到它没什么数据可以放入out了;
  • ChannelPipeline 添加处理程序TimeDecoder来实现数据的分解。

还可以通过以下方式进一步简化解码器:

public class TimeWithReplayingDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(in.readBytes(4));
    }
}
// 同样的,别忘了在 ChannelPipeline 中添加相应的处理程序
 b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            //.addLast(new TimeDecoder())
                            .addLast(new TimeWithReplayingDecoder())
                            .addLast(new TimeClientHandler());
                }
            });

此外,Netty 提供了开箱即用的解码器,使您能够非常容易地实现大多数协议,并帮助您避免最终得到一个不可维护的整体处理程序实现。更详细的例子请参考以下软件包:

对象序列化传输

到目前为止,我们讨论的所有示例都使用 ByteBuf 作为协议消息的主要数据结构。实际的网络通信过程远比上面的时间协议实现的要更复杂,功能也要更加强大,比如我们常用的 Json 序列化传输,如果用 Netty,能否直接传输对象呢?

在 ChannelHandlers 中使用 POJO 的优势是显而易见的;通过分离从处理程序中提取 ByteBuf 信息的代码,您的处理程序变得更加可维护和可重用。在 TIME 协议的客户端和服务器示例中,我们只读取一个32位整数,直接使用 Bytebuf 并不是一个主要问题。但是,您会发现在实现现实世界的协议时有必要进行分离。

首先,我们将我们要传输的时间戳封装成一个简单对象:

public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

增加解码器:

public class TimeDecoderWithPojo extends ByteToMessageDecoder { // (1)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        //out.add(in.readBytes(4)); // (4)
        out.add(new UnixTime(in.readUnsignedInt())); // (4)
    }
}

增加处理器:

public class TimeClientHandlerWithPojo extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

和前面一样,设置客户端的处理器:

            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            //.addLast(new TimeDecoder())
                            .addLast(new TimeDecoderWithPojo())
                            .addLast(new TimeClientHandlerWithPojo());
                }
            });

响应结果如下:

5
通过更新的解码器,``TimeClientHandler 不再使用 ByteBuf。

更简单和优雅,对不对?同样的技术也可以应用于服务器端。

首先是消息处理器,负责发送一个时间戳数据作为响应结果:

public class TimeServerHandlerWithPojo extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UnixTime unixTime = new UnixTime();
        System.out.println("准备发送:"+ unixTime);
        final ChannelFuture f = ctx.writeAndFlush(unixTime);
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

然后是编码处理器,将Pojo转换为ByteBuf进行传输:


public class TimeServerEncoderHandlerWithPojo extends ChannelOutboundHandlerAdapter {
    // 它是 ChannelOutboundHandler 的一个实现,它将 UnixTime 转换回 ByteBuf。这比编写解码器要简单得多,因为在对消息进行编码时不需要处理数据包碎片和汇编。
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int) m.value());
        ctx.write(encoded, promise); // (1)
        // 首先,我们传递原始的 ChannelPromise as-is,这样当编码的数据实际写入到连线时,Netty 将其标记为成功或失败。
        // 其次,我们没有调用 ctx.flush ()。有一个单独的处理程序方法 void flush (ChannelHandlerContext ctx) ,它旨在重写 flush ()操作。
    }
}

最后是服务端ChannelPipeline程序设置:

  bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new TimeServerEncoderHandlerWithPojo())
                                    .addLast(new TimeServerHandlerWithPojo());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

服务端接收到请求发送数据:
6

客户端接收到请求的响应数据:

7

同样的,Netty 也为服务端的消息编码定义了很多拆箱即用的工具类:

public class TimeServerMessageToByteEncoderHandler extends MessageToByteEncoder<UnixTime> {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int) m.value());
        ctx.write(encoded, promise); // (1)
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
        out.writeInt((int) msg.value());
    }
}
// ChannelPipeline 设置
bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    //.addLast(new TimeServerEncoderHandlerWithPojo())
                                    .addLast(new TimeServerMessageToByteEncoderHandler())
                                    .addLast(new TimeServerHandlerWithPojo());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

关闭

关闭 Netty 应用程序通常非常简单,只需关闭通过 shutdownly() 创建的所有 EventLoopGroups 即可。它返回一个 Future,当 EventLoopGroup 完全终止并且属于该组的所有通道都已关闭时,它会通知您。(前文示例已演示多次,此处不再赘述。)

源码:https://gitee.com/zacsnz/architectrue-adventure/tree/master/netty-examples/netty-chapter-1

小结

Netty 作为高性能的异步通信框架,提供了很多很多好用的 API。

  • Channel: Channel 接口是 Netty 对网络操作抽象类,它除了包括基本的 I/O 操作,如 bind()、connect()、read()、write() 等。比较常用的Channel接口实现类是NioServerSocketChannel(服务端)和NioSocketChannel(客户端),这两个 Channel 可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上。Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。

  • EventLoop: 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。那 Channel 和 EventLoop 直接有啥联系呢?Channel 为 Netty 网络操作(读写等操作)抽象类,EventLoop 负责处理注册到其上的Channel 处理 I/O 操作,两者配合参与 I/O 操作。

  • ChannelFuture: Netty 是异步非阻塞的,所有的 I/O 操作都为异步的。因此,我们不能立刻得到操作是否执行成功,但是,你可以通过 ChannelFuture 接口的 addListener() 方法注册一个 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果。并且,你还可以通过ChannelFuture 的 channel() 方法获取关联的Channel。

  • ChannelHandler: 息的具体处理器。他负责处理读写操作、客户端连接等事情。

  • ChannelPipeline: ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API 。当 Channel 被创建时,它会被自动地分配到它专属的ChannelPipeline。我们可以在 ChannelPipeline 上通过 addLast() 方法添加一个或者多个ChannelHandler ,因为一个数据或者事件可能会被多个 Handler 处理。当一个 ChannelHandler 处理完之后就将数据交给下一个 ChannelHandler 。

  • EventLoopGroup 包含多个 EventLoop(每一个 EventLoop 通常内部包含一个线程),上面我们已经说了 EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。

  • Bootstrap 是客户端的启动引导类/辅助类。

  • ServerBootstrap 客户端的启动引导类/辅助类。

REFERENCES

8

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty 入门实战 的相关文章

  • nrm 安装与使用

    nrm 安装与使用 nrm 是 npm 的镜像源管理工具 可以用来切换 npm 镜像源 安装 nrm 使用 npm 全局安装 npm i g nrm nrm 命令 查看镜像列表 带 号为当前镜像 nrm ls 查看当前所在镜像 nrm cu

随机推荐

  • 【STM32F4】二、I/O引脚的复用和映射

    目录 一 基本概念 1 什么是I O引脚 2 什么是I O引脚的复用 二 如何配置I O引脚复用 1 复用器 GPIOx AFRL GPIOx AFRH 和复用功能 AF 2 程序编写 2 1 打开I O时钟和复用功能对应外设时钟 2 2
  • win7安装ubuntu,如何设置win7为默认启动项

    第一种方法 运行sudo gedit etc default grub 输入密码 将GRUB DEFAULT 0改为GRUB DEFAULT 4 如果没做过其他设置 即启动时win7为第5项 可根据自己情况更改 然后运行sudo updat
  • Android 快速开发框架:推荐10个框架

    一 Afinal 官方介绍 Afinal是一个android的ioc orm框架 内置了四大模块功能 FinalAcitivity FinalBitmap FinalDb FinalHttp 通过finalActivity 我们可以通过注解
  • 免费的HTML5连载来了《HTML5网页开发实例详解》连载(五)图解通过Fiddler加速开发...

    Fiddler是Windows底下最强大的请求代理调试工具 监控任何浏览器的HTTP HTTPS流量 窜改客户端请求和服务器响应 解密HTTPS Web会话 图4 44为Fiddler原理示意图 图4 44 Fiddler原理示意图 Fid
  • 20个基于SpringBoot搭建的开源项目,总有一个你会感兴趣

    前言 SpringBoot一直是开发者比较青睐的一款轻量级框架 他不仅继承了Spring框架原有的优秀特性 而且还通过简化配置来进一步简化了Spring应用的整个搭建和开发过程 现在很多Java系的软件开发都是基于SpringBoot的 这
  • 安秒平衡在单相整流器纹波分析中的应用

    1 占空比表达式 单相PWM整流器的拓扑如图所示 四个开关管S1 S4 采用双极性调制 S1与S4开关信号一样 S2和S3开关信号相同 两者互补 假设调制度表达式 m Vgm Vo S1占空比表达式为D S1 0 5m sinwt 0 5
  • 数据提取-Selenium专治各种顽固性客户端

    说起Selenium 很多人想到的是Selenium用在自动化web测试上 的确 Selenium是一个很好的自动化测试工具 能够实现很多便利的测试功能 其实Selenium也是一款数据抽取的神器 我们知道现在很多网站使用了很多javasc
  • OSI 五层网络模型详解

    OSI网络模型 是指 开放式系统互联通信参考模型 英语 Open System Interconnection Reference Model 缩写为 OSI 简称为OSI模型 OSI model 一种概念模型 由ISO 国际标准化组织 提
  • python 查找指定字符在字符串中的次数(全)

    s neu is very neupk kjneuneu dhsj neu print s count neu n 0 co 0 while s find neu n len s 1 co 1 n s find neu n len s 1
  • 【技术经验分享】计算机毕业设计hadoop+spark知识图谱房源推荐系统 房价预测系统 房源数据分析 房源可视化 房源大数据大屏 大数据毕业设计 机器学习

    创新点 1 支付宝沙箱支付 2 支付邮箱通知 JavaMail 3 短信验证码修改密码 4 知识图谱 5 四种推荐算法 协同过滤基于用户 物品 SVD混合神经网络 MLP深度学习模型 6 线性回归算法预测房价 7 Python爬虫采集链家数
  • 实现线程同步的几种方法

    在多线程程序中 会出现多个线程抢占一个资源的情况 这时间有可能会造成冲突 也就是一个线程可能还没来得及将更改的 资源保存 另一个线程的更改就开始了 可能造成数据不一致 因此引入多线程同步 也就是说多个线程只能一个对共享的资源进行更改 其他线
  • 计算机用户名携带中文路径,Win10 User下的中文用户名改成英文路径操作方法

    导读 Win10 User下的中文用户名改成英文路径的方法 相关电脑教程分享 有很多Win10系统用户在新购买电脑或是新装Win10时采用了中文用户名 这时会发现C User下的文件夹是中文名 但是这样有一个很不好的地方 很多软件安装在个路
  • JavaWeb新留言板系统 23年4月原创

    JavaWeb留言板系统 2023年4月原创 极低价付费提供项目代码 sql文件 配置说明 包运行服务 答疑解惑服务 私信即可 功能概述 JavaWeb留言板系统项目遵循MVC编程模式 基于Servlet Bootstrap MySQL J
  • Git&&Github配置

    1 把现有的 ssh key 都删掉或查找现有的key PS 如果多打空格可能要破环系统 删掉现有的Key rm rf ssh 查找现有的Key 如果没有 bash终端显示如下 No such file or directory 如果已经存
  • 常见Windows Server漏洞处理方法

    常见的几种漏洞 mysql 3306 Oracle 1521 Redis 6379 Tomcat 8080 445 138端口不安全 weblogic Server 7001 3389远程桌面 下面分别讲解几种漏洞解决的方法 一 mysql
  • CH06_第一组重构(下)

    封装变量 Encapsulate Variable 132 曾用名 自封装字段 Self Encapsulate Field 曾用名 封装字段 Encapsulate Field let defaultOwner firstName Mar
  • C++的强制类型转换

    关于强制类型转换的问题 很多书都讨论过 写的最详细的是C 之父的 C 的设计和演化 最好的解决方法就是不要使用C风格的强制类型转换 而是使用标准C 的类型转换符 static cast dynamic cast 标准C 中有四个类型转换符
  • 在docker上安装spark

    拉安装好spark的镜像 https hub docker com r bde2020 spark master docker pull bde2020 spark master 安装master节点 docker run name spa
  • SpringBoot 整合shiro框架

    网上有很多整合shiro的博客分享 但是貌似没找到一个完整 并且能够实现的 不是包的问题 就是代码的问题 也可能是自己的问题 或者版本的问题 所以 整理了一版自己已应用的 maven依赖
  • Netty 入门实战

    Netty 入门实战 异步事件驱动的Java开源网络应用程序框架 用于快速开发可维护的高性能协议服务器和客户端 Netty 项目旨在为可维护的高性能和高可伸缩性协议服务器和客户端的快速开发提供一个异步事件驱动的网络应用框架和工具 Netty